Index: core/src/main/java/org/apache/hama/bsp/join/ComposableInputFormat.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/ComposableInputFormat.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/ComposableInputFormat.java (revision 0) @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp.join; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.InputFormat; +import org.apache.hama.bsp.InputSplit; + +/** + * Refinement of InputFormat requiring implementors to provide + * ComposableRecordReader instead of RecordReader. + */ +public interface ComposableInputFormat + extends InputFormat { + + ComposableRecordReader getRecordReader(InputSplit split, BSPJob job) + throws IOException; +} Index: core/src/main/java/org/apache/hama/bsp/join/InnerJoinRecordReader.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/InnerJoinRecordReader.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/InnerJoinRecordReader.java (revision 0) @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp.join; + +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hama.bsp.BSPJob; + +/** + * Full inner join. + */ +public class InnerJoinRecordReader extends + JoinRecordReader { + + InnerJoinRecordReader(int id, BSPJob job, int capacity, + Class cmpcl) throws IOException { + super(id, job, capacity, cmpcl); + } + + /** + * Return true iff the tuple is full (all data sources contain this key). + */ + protected boolean combine(Object[] srcs, TupleWritable dst) { + assert srcs.length == dst.size(); + for (int i = 0; i < srcs.length; ++i) { + if (!dst.has(i)) { + return false; + } + } + return true; + } +} Index: core/src/main/java/org/apache/hama/bsp/join/ResetableIterator.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/ResetableIterator.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/ResetableIterator.java (revision 0) @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.join; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * This defines an interface to a stateful Iterator that can replay elements + * added to it directly. Note that this does not extend + * {@link java.util.Iterator}. + */ +public interface ResetableIterator { + + public static class EMPTY implements ResetableIterator { + public boolean hasNext() { + return false; + } + + public void reset() { + } + + public void close() throws IOException { + } + + public void clear() { + } + + public boolean next(U val) throws IOException { + return false; + } + + public boolean replay(U val) throws IOException { + return false; + } + + public void add(U item) throws IOException { + throw new UnsupportedOperationException(); + } + } + + /** + * True if a call to next may return a value. This is permitted false + * positives, but not false negatives. + */ + public boolean hasNext(); + + /** + * Assign next value to actual. It is required that elements added to a + * ResetableIterator be returned in the same order after a call to + * {@link #reset} (FIFO). + * + * Note that a call to this may fail for nested joins (i.e. more elements + * available, but none satisfying the constraints of the join) + */ + public boolean next(T val) throws IOException; + + /** + * Assign last value returned to actual. + */ + public boolean replay(T val) throws IOException; + + /** + * Set iterator to return to the start of its range. Must be called after + * calling {@link #add} to avoid a ConcurrentModificationException. + */ + public void reset(); + + /** + * Add an element to the collection of elements to iterate over. + */ + public void add(T item) throws IOException; + + /** + * Close datasources and release resources. Calling methods on the iterator + * after calling close has undefined behavior. + */ + // XXX is this necessary? + public void close() throws IOException; + + /** + * Close datasources, but do not release internal resources. Calling this + * method should permit the object to be reused with a different datasource. + */ + public void clear(); + +} Index: core/src/main/java/org/apache/hama/bsp/join/JoinRecordReader.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/JoinRecordReader.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/JoinRecordReader.java (revision 0) @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp.join; + +import java.io.IOException; +import java.util.PriorityQueue; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hama.bsp.BSPJob; + +/** + * Base class for Composite joins returning Tuples of arbitrary Writables. + */ +public abstract class JoinRecordReader extends + CompositeRecordReader implements + ComposableRecordReader { + + public JoinRecordReader(int id, BSPJob job, int capacity, + Class cmpcl) throws IOException { + super(id, capacity, cmpcl); + setConf(job.getConfiguration()); + } + + /** + * Emit the next set of key, value pairs as defined by the child RecordReaders + * and operation associated with this composite RR. + */ + public boolean next(K key, TupleWritable value) throws IOException { + if (jc.flush(value)) { + WritableUtils.cloneInto(key, jc.key()); + return true; + } + jc.clear(); + K iterkey = createKey(); + final PriorityQueue> q = getRecordReaderQueue(); + while (!q.isEmpty()) { + fillJoinCollector(iterkey); + jc.reset(iterkey); + if (jc.flush(value)) { + WritableUtils.cloneInto(key, jc.key()); + return true; + } + jc.clear(); + } + return false; + } + + /** {@inheritDoc} */ + public TupleWritable createValue() { + return createInternalValue(); + } + + /** + * Return an iterator wrapping the JoinCollector. + */ + protected ResetableIterator getDelegate() { + return new JoinDelegationIterator(); + } + + /** + * Since the JoinCollector is effecting our operation, we need only provide an + * iterator proxy wrapping its operation. + */ + protected class JoinDelegationIterator implements + ResetableIterator { + + public boolean hasNext() { + return jc.hasNext(); + } + + public boolean next(TupleWritable val) throws IOException { + return jc.flush(val); + } + + public boolean replay(TupleWritable val) throws IOException { + return jc.replay(val); + } + + public void reset() { + jc.reset(jc.key()); + } + + public void add(TupleWritable item) throws IOException { + throw new UnsupportedOperationException(); + } + + public void close() throws IOException { + jc.close(); + } + + public void clear() { + jc.clear(); + } + } +} Index: core/src/main/java/org/apache/hama/bsp/join/MultiFilterRecordReader.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/MultiFilterRecordReader.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/MultiFilterRecordReader.java (revision 0) @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp.join; + +import java.io.IOException; +import java.util.PriorityQueue; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.RecordReader; + +/** + * Base class for Composite join returning values derived from multiple sources, + * but generally not tuples. + */ +public abstract class MultiFilterRecordReader + extends CompositeRecordReader implements + ComposableRecordReader { + + private Class valueclass; + private TupleWritable ivalue; + + public MultiFilterRecordReader(int id, BSPJob job, int capacity, + Class cmpcl) throws IOException { + super(id, capacity, cmpcl); + setConf(job.getConfiguration()); + } + + /** + * For each tuple emitted, return a value (typically one of the values in the + * tuple). Modifying the Writables in the tuple is permitted and unlikely to + * affect join behavior in most cases, but it is not recommended. It's safer + * to clone first. + */ + protected abstract V emit(TupleWritable dst) throws IOException; + + /** + * Default implementation offers {@link #emit} every Tuple from the collector + * (the outer join of child RRs). + */ + protected boolean combine(Object[] srcs, TupleWritable dst) { + return true; + } + + /** {@inheritDoc} */ + public boolean next(K key, V value) throws IOException { + if (jc.flush(ivalue)) { + WritableUtils.cloneInto(key, jc.key()); + WritableUtils.cloneInto(value, emit(ivalue)); + return true; + } + jc.clear(); + K iterkey = createKey(); + final PriorityQueue> q = getRecordReaderQueue(); + while (!q.isEmpty()) { + fillJoinCollector(iterkey); + jc.reset(iterkey); + if (jc.flush(ivalue)) { + WritableUtils.cloneInto(key, jc.key()); + WritableUtils.cloneInto(value, emit(ivalue)); + return true; + } + jc.clear(); + } + return false; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + // Explicit check for value class agreement + public V createValue() { + if (null == valueclass) { + final Class cls = kids[0].createValue().getClass(); + for (RecordReader rr : kids) { + if (!cls.equals(rr.createValue().getClass())) { + throw new ClassCastException("Child value classes fail to agree"); + } + } + valueclass = cls.asSubclass(Writable.class); + ivalue = createInternalValue(); + } + return (V) ReflectionUtils.newInstance(valueclass, null); + } + + /** + * Return an iterator returning a single value from the tuple. + * + * @see MultiFilterDelegationIterator + */ + protected ResetableIterator getDelegate() { + return new MultiFilterDelegationIterator(); + } + + /** + * Proxy the JoinCollector, but include callback to emit. + */ + protected class MultiFilterDelegationIterator implements ResetableIterator { + + public boolean hasNext() { + return jc.hasNext(); + } + + public boolean next(V val) throws IOException { + boolean ret; + if (ret = jc.flush(ivalue)) { + WritableUtils.cloneInto(val, emit(ivalue)); + } + return ret; + } + + public boolean replay(V val) throws IOException { + WritableUtils.cloneInto(val, emit(ivalue)); + return true; + } + + public void reset() { + jc.reset(jc.key()); + } + + public void add(V item) throws IOException { + throw new UnsupportedOperationException(); + } + + public void close() throws IOException { + jc.close(); + } + + public void clear() { + jc.clear(); + } + } + +} Index: core/src/main/java/org/apache/hama/bsp/join/TupleWritable.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/TupleWritable.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/TupleWritable.java (revision 0) @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp.join; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +/** + * Writable type storing multiple {@link org.apache.hadoop.io.Writable}s. + * + * This is *not* a general-purpose tuple type. In almost all cases, users are + * encouraged to implement their own serializable types, which can perform + * better validation and provide more efficient encodings than this class is + * capable. TupleWritable relies on the join framework for type safety and + * assumes its instances will rarely be persisted, assumptions not only + * incompatible with, but contrary to the general case. + * + * @see org.apache.hadoop.io.Writable + */ +public class TupleWritable implements Writable, Iterable { + + private long written; + private Writable[] values; + + /** + * Create an empty tuple with no allocated storage for writables. + */ + public TupleWritable() { + } + + /** + * Initialize tuple with storage; unknown whether any of them contain + * "written" values. + */ + public TupleWritable(Writable[] vals) { + written = 0L; + values = vals; + } + + /** + * Return true if tuple has an element at the position provided. + */ + public boolean has(int i) { + return 0 != ((1L << i) & written); + } + + /** + * Get ith Writable from Tuple. + */ + public Writable get(int i) { + return values[i]; + } + + /** + * The number of children in this Tuple. + */ + public int size() { + return values.length; + } + + /** + * {@inheritDoc} + */ + public boolean equals(Object other) { + if (other instanceof TupleWritable) { + TupleWritable that = (TupleWritable) other; + if (this.size() != that.size() || this.written != that.written) { + return false; + } + for (int i = 0; i < values.length; ++i) { + if (!has(i)) + continue; + if (!values[i].equals(that.get(i))) { + return false; + } + } + return true; + } + return false; + } + + public int hashCode() { + assert false : "hashCode not designed"; + return (int) written; + } + + /** + * Return an iterator over the elements in this tuple. Note that this doesn't + * flatten the tuple; one may receive tuples from this iterator. + */ + public Iterator iterator() { + final TupleWritable t = this; + return new Iterator() { + long i = written; + long last = 0L; + + public boolean hasNext() { + return 0L != i; + } + + public Writable next() { + last = Long.lowestOneBit(i); + if (0 == last) + throw new NoSuchElementException(); + i ^= last; + // numberOfTrailingZeros rtn 64 if lsb set + return t.get(Long.numberOfTrailingZeros(last) % 64); + } + + public void remove() { + t.written ^= last; + if (t.has(Long.numberOfTrailingZeros(last))) { + throw new IllegalStateException("Attempt to remove non-existent val"); + } + } + }; + } + + /** + * Convert Tuple to String as in the following. + * [,,...,] + */ + public String toString() { + StringBuffer buf = new StringBuffer("["); + for (int i = 0; i < values.length; ++i) { + buf.append(has(i) ? values[i].toString() : ""); + buf.append(","); + } + if (values.length != 0) + buf.setCharAt(buf.length() - 1, ']'); + else + buf.append(']'); + return buf.toString(); + } + + // Writable + + /** + * Writes each Writable to out. TupleWritable format: + * {@code + * ...... + * } + */ + public void write(DataOutput out) throws IOException { + WritableUtils.writeVInt(out, values.length); + WritableUtils.writeVLong(out, written); + for (int i = 0; i < values.length; ++i) { + Text.writeString(out, values[i].getClass().getName()); + } + for (int i = 0; i < values.length; ++i) { + if (has(i)) { + values[i].write(out); + } + } + } + + /** + * {@inheritDoc} + */ + @SuppressWarnings("unchecked") + // No static typeinfo on Tuples + public void readFields(DataInput in) throws IOException { + int card = WritableUtils.readVInt(in); + values = new Writable[card]; + written = WritableUtils.readVLong(in); + Class[] cls = new Class[card]; + try { + for (int i = 0; i < card; ++i) { + cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class); + } + for (int i = 0; i < card; ++i) { + values[i] = cls[i].newInstance(); + if (has(i)) { + values[i].readFields(in); + } + } + } catch (ClassNotFoundException e) { + throw (IOException) new IOException("Failed tuple init").initCause(e); + } catch (IllegalAccessException e) { + throw (IOException) new IOException("Failed tuple init").initCause(e); + } catch (InstantiationException e) { + throw (IOException) new IOException("Failed tuple init").initCause(e); + } + } + + /** + * Record that the tuple contains an element at the position provided. + */ + void setWritten(int i) { + written |= 1L << i; + } + + /** + * Record that the tuple does not contain an element at the position provided. + */ + void clearWritten(int i) { + written &= -1 ^ (1L << i); + } + + /** + * Clear any record of which writables have been written to, without releasing + * storage. + */ + void clearWritten() { + written = 0L; + } + +} Index: core/src/main/java/org/apache/hama/bsp/join/CompositeRecordReader.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/CompositeRecordReader.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/CompositeRecordReader.java (revision 0) @@ -0,0 +1,458 @@ +/** * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp.join; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.PriorityQueue; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.bsp.RecordReader; + +/** + * A RecordReader that can effect joins of RecordReaders sharing a common key + * type and partitioning. + */ +public abstract class CompositeRecordReader as children +X extends Writable> // emits Writables of this type + implements Configurable { + + private int id; + private Configuration conf; + private final ResetableIterator EMPTY = new ResetableIterator.EMPTY(); + + private WritableComparator cmp; + private Class keyclass; + private PriorityQueue> q; + + protected final JoinCollector jc; + protected final ComposableRecordReader[] kids; + + protected abstract boolean combine(Object[] srcs, TupleWritable value); + + /** + * Create a RecordReader with capacity children to position + * id in the parent reader. The id of a root CompositeRecordReader is + * -1 by convention, but relying on this is not recommended. + */ + @SuppressWarnings("unchecked") + // Generic array assignment + public CompositeRecordReader(int id, int capacity, + Class cmpcl) throws IOException { + assert capacity > 0 : "Invalid capacity"; + this.id = id; + if (null != cmpcl) { + cmp = ReflectionUtils.newInstance(cmpcl, null); + q = new PriorityQueue>(3, + new Comparator>() { + public int compare(ComposableRecordReader o1, + ComposableRecordReader o2) { + return cmp.compare(o1.key(), o2.key()); + } + }); + } + jc = new JoinCollector(capacity); + kids = new ComposableRecordReader[capacity]; + } + + /** + * Return the position in the collector this class occupies. + */ + public int id() { + return id; + } + + /** + * {@inheritDoc} + */ + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * {@inheritDoc} + */ + public Configuration getConf() { + return conf; + } + + /** + * Return sorted list of RecordReaders for this composite. + */ + protected PriorityQueue> getRecordReaderQueue() { + return q; + } + + /** + * Return comparator defining the ordering for RecordReaders in this + * composite. + */ + protected WritableComparator getComparator() { + return cmp; + } + + /** + * Add a RecordReader to this collection. The id() of a RecordReader + * determines where in the Tuple its entry will appear. Adding RecordReaders + * with the same id has undefined behavior. + */ + public void add(ComposableRecordReader rr) throws IOException { + kids[rr.id()] = rr; + if (null == q) { + cmp = WritableComparator.get(rr.createKey().getClass()); + q = new PriorityQueue>(3, + new Comparator>() { + public int compare(ComposableRecordReader o1, + ComposableRecordReader o2) { + return cmp.compare(o1.key(), o2.key()); + } + }); + } + if (rr.hasNext()) { + q.add(rr); + } + } + + /** + * Collector for join values. This accumulates values for a given key from the + * child RecordReaders. If one or more child RR contain duplicate keys, this + * will emit the cross product of the associated values until exhausted. + */ + class JoinCollector { + private K key; + private ResetableIterator[] iters; + private int pos = -1; + private boolean first = true; + + /** + * Construct a collector capable of handling the specified number of + * children. + */ + @SuppressWarnings("unchecked") + // Generic array assignment + public JoinCollector(int card) { + iters = new ResetableIterator[card]; + for (int i = 0; i < iters.length; ++i) { + iters[i] = EMPTY; + } + } + + /** + * Register a given iterator at position id. + */ + public void add(int id, ResetableIterator i) throws IOException { + iters[id] = i; + } + + /** + * Return the key associated with this collection. + */ + public K key() { + return key; + } + + /** + * Codify the contents of the collector to be iterated over. When this is + * called, all RecordReaders registered for this key should have added + * ResetableIterators. + */ + public void reset(K key) { + this.key = key; + first = true; + pos = iters.length - 1; + for (int i = 0; i < iters.length; ++i) { + iters[i].reset(); + } + } + + /** + * Clear all state information. + */ + public void clear() { + key = null; + pos = -1; + for (int i = 0; i < iters.length; ++i) { + iters[i].clear(); + iters[i] = EMPTY; + } + } + + /** + * Returns false if exhausted or if reset(K) has not been called. + */ + protected boolean hasNext() { + return !(pos < 0); + } + + /** + * Populate Tuple from iterators. It should be the case that, given + * iterators i_1...i_n over values from sources s_1...s_n sharing key k, + * repeated calls to next should yield I x I. + */ + @SuppressWarnings("unchecked") + // No static typeinfo on Tuples + protected boolean next(TupleWritable val) throws IOException { + if (first) { + int i = -1; + for (pos = 0; pos < iters.length; ++pos) { + if (iters[pos].hasNext() && iters[pos].next((X) val.get(pos))) { + i = pos; + val.setWritten(i); + } + } + pos = i; + first = false; + if (pos < 0) { + clear(); + return false; + } + return true; + } + while (0 <= pos + && !(iters[pos].hasNext() && iters[pos].next((X) val.get(pos)))) { + --pos; + } + if (pos < 0) { + clear(); + return false; + } + val.setWritten(pos); + for (int i = 0; i < pos; ++i) { + if (iters[i].replay((X) val.get(i))) { + val.setWritten(i); + } + } + while (pos + 1 < iters.length) { + ++pos; + iters[pos].reset(); + if (iters[pos].hasNext() && iters[pos].next((X) val.get(pos))) { + val.setWritten(pos); + } + } + return true; + } + + /** + * Replay the last Tuple emitted. + */ + @SuppressWarnings("unchecked") + // No static typeinfo on Tuples + public boolean replay(TupleWritable val) throws IOException { + // The last emitted tuple might have drawn on an empty source; + // it can't be cleared prematurely, b/c there may be more duplicate + // keys in iterator positions < pos + assert !first; + boolean ret = false; + for (int i = 0; i < iters.length; ++i) { + if (iters[i].replay((X) val.get(i))) { + val.setWritten(i); + ret = true; + } + } + return ret; + } + + /** + * Close all child iterators. + */ + public void close() throws IOException { + for (int i = 0; i < iters.length; ++i) { + iters[i].close(); + } + } + + /** + * Write the next value into key, value as accepted by the operation + * associated with this set of RecordReaders. + */ + public boolean flush(TupleWritable value) throws IOException { + while (hasNext()) { + value.clearWritten(); + if (next(value) && combine(kids, value)) { + return true; + } + } + return false; + } + } + + /** + * Return the key for the current join or the value at the top of the + * RecordReader heap. + */ + public K key() { + if (jc.hasNext()) { + return jc.key(); + } + if (!q.isEmpty()) { + return q.peek().key(); + } + return null; + } + + /** + * Clone the key at the top of this RR into the given object. + */ + public void key(K key) throws IOException { + WritableUtils.cloneInto(key, key()); + } + + /** + * Return true if it is possible that this could emit more values. + */ + public boolean hasNext() { + return jc.hasNext() || !q.isEmpty(); + } + + /** + * Pass skip key to child RRs. + */ + public void skip(K key) throws IOException { + ArrayList> tmp = new ArrayList>(); + while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) { + tmp.add(q.poll()); + } + for (ComposableRecordReader rr : tmp) { + rr.skip(key); + if (rr.hasNext()) { + q.add(rr); + } + } + } + + /** + * Obtain an iterator over the child RRs apropos of the value type ultimately + * emitted from this join. + */ + protected abstract ResetableIterator getDelegate(); + + /** + * If key provided matches that of this Composite, give JoinCollector iterator + * over values it may emit. + */ + @SuppressWarnings("unchecked") + // No values from static EMPTY class + public void accept(CompositeRecordReader.JoinCollector jc, K key) + throws IOException { + if (hasNext() && 0 == cmp.compare(key, key())) { + fillJoinCollector(createKey()); + jc.add(id, getDelegate()); + return; + } + jc.add(id, EMPTY); + } + + /** + * For all child RRs offering the key provided, obtain an iterator at that + * position in the JoinCollector. + */ + protected void fillJoinCollector(K iterkey) throws IOException { + if (!q.isEmpty()) { + q.peek().key(iterkey); + while (0 == cmp.compare(q.peek().key(), iterkey)) { + ComposableRecordReader t = q.poll(); + t.accept(jc, iterkey); + if (t.hasNext()) { + q.add(t); + } else if (q.isEmpty()) { + return; + } + } + } + } + + /** + * Implement Comparable contract (compare key of join or head of heap with + * that of another). + */ + public int compareTo(ComposableRecordReader other) { + return cmp.compare(key(), other.key()); + } + + /** + * Create a new key value common to all child RRs. + * + * @throws ClassCastException if key classes differ. + */ + @SuppressWarnings("unchecked") + // Explicit check for key class agreement + public K createKey() { + if (null == keyclass) { + final Class cls = kids[0].createKey().getClass(); + for (RecordReader rr : kids) { + if (!cls.equals(rr.createKey().getClass())) { + throw new ClassCastException("Child key classes fail to agree"); + } + } + keyclass = cls.asSubclass(WritableComparable.class); + } + return (K) ReflectionUtils.newInstance(keyclass, getConf()); + } + + /** + * Create a value to be used internally for joins. + */ + protected TupleWritable createInternalValue() { + Writable[] vals = new Writable[kids.length]; + for (int i = 0; i < vals.length; ++i) { + vals[i] = kids[i].createValue(); + } + return new TupleWritable(vals); + } + + /** + * Unsupported (returns zero in all cases). + */ + public long getPos() throws IOException { + return 0; + } + + /** + * Close all child RRs. + */ + public void close() throws IOException { + if (kids != null) { + for (RecordReader rr : kids) { + rr.close(); + } + } + if (jc != null) { + jc.close(); + } + } + + /** + * Report progress as the minimum of all child RR progress. + */ + public float getProgress() throws IOException { + float ret = 1.0f; + for (RecordReader rr : kids) { + ret = Math.min(ret, rr.getProgress()); + } + return ret; + } +} Index: core/src/main/java/org/apache/hama/bsp/join/WrappedRecordReader.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/WrappedRecordReader.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/WrappedRecordReader.java (revision 0) @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp.join; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hama.bsp.RecordReader; + +/** + * Proxy class for a RecordReader participating in the join framework. This + * class keeps track of the "head" key-value pair for the provided + * RecordReader and keeps a store of values matching a key when this source is + * participating in a join. + */ +public class WrappedRecordReader + implements ComposableRecordReader { + + private boolean empty = false; + private RecordReader rr; + private int id; // index at which values will be inserted in collector + + private K khead; // key at the top of this RR + private U vhead; // value assoc with khead + private WritableComparator cmp; + + private ResetableIterator vjoin; + + /** + * For a given RecordReader rr, occupy position id in collector. + */ + WrappedRecordReader(int id, RecordReader rr, + Class cmpcl) throws IOException { + this.id = id; + this.rr = rr; + khead = rr.createKey(); + vhead = rr.createValue(); + try { + cmp = (null == cmpcl) ? WritableComparator.get(khead.getClass()) : cmpcl + .newInstance(); + } catch (InstantiationException e) { + throw (IOException) new IOException().initCause(e); + } catch (IllegalAccessException e) { + throw (IOException) new IOException().initCause(e); + } + vjoin = new StreamBackedIterator(); + next(); + } + + /** {@inheritDoc} */ + public int id() { + return id; + } + + /** + * Return the key at the head of this RR. + */ + public K key() { + return khead; + } + + /** + * Clone the key at the head of this RR into the object supplied. + */ + public void key(K qkey) throws IOException { + WritableUtils.cloneInto(qkey, khead); + } + + /** + * Return true if the RR- including the k,v pair stored in this object- is + * exhausted. + */ + public boolean hasNext() { + return !empty; + } + + /** + * Skip key-value pairs with keys less than or equal to the key provided. + */ + public void skip(K key) throws IOException { + if (hasNext()) { + while (cmp.compare(khead, key) <= 0 && next()) + ; + } + } + + /** + * Read the next k,v pair into the head of this object; return true iff the RR + * and this are exhausted. + */ + protected boolean next() throws IOException { + empty = !rr.next(khead, vhead); + return hasNext(); + } + + /** + * Add an iterator to the collector at the position occupied by this + * RecordReader over the values in this stream paired with the key provided + * (ie register a stream of values from this source matching K with a + * collector). + */ + // JoinCollector comes from parent, which has + @SuppressWarnings("unchecked") + // no static type for the slot this sits in + public void accept(CompositeRecordReader.JoinCollector i, K key) + throws IOException { + vjoin.clear(); + if (0 == cmp.compare(key, khead)) { + do { + vjoin.add(vhead); + } while (next() && 0 == cmp.compare(key, khead)); + } + i.add(id, vjoin); + } + + /** + * Write key-value pair at the head of this stream to the objects provided; + * get next key-value pair from proxied RR. + */ + public boolean next(K key, U value) throws IOException { + if (hasNext()) { + WritableUtils.cloneInto(key, khead); + WritableUtils.cloneInto(value, vhead); + next(); + return true; + } + return false; + } + + /** + * Request new key from proxied RR. + */ + public K createKey() { + return rr.createKey(); + } + + /** + * Request new value from proxied RR. + */ + public U createValue() { + return rr.createValue(); + } + + /** + * Request progress from proxied RR. + */ + public float getProgress() throws IOException { + return rr.getProgress(); + } + + /** + * Request position from proxied RR. + */ + public long getPos() throws IOException { + return rr.getPos(); + } + + /** + * Forward close request to proxied RR. + */ + public void close() throws IOException { + rr.close(); + } + + /** + * Implement Comparable contract (compare key at head of proxied RR with that + * of another). + */ + public int compareTo(ComposableRecordReader other) { + return cmp.compare(key(), other.key()); + } + + /** + * Return true iff compareTo(other) retn true. + */ + @SuppressWarnings("unchecked") + // Explicit type check prior to cast + public boolean equals(Object other) { + return other instanceof ComposableRecordReader + && 0 == compareTo((ComposableRecordReader) other); + } + + public int hashCode() { + assert false : "hashCode not designed"; + return 42; + } + +} Index: core/src/main/java/org/apache/hama/bsp/join/CompositeInputFormat.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/CompositeInputFormat.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/CompositeInputFormat.java (revision 0) @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.join; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.InputFormat; +import org.apache.hama.bsp.InputSplit; + +public class CompositeInputFormat implements + ComposableInputFormat { + + // expression parse tree to which IF requests are proxied + private Parser.Node root; + + public CompositeInputFormat() { + } + + /** + * Interpret a given string as a composite expression. + * {@code + * func ::= ([,]*) + * func ::= tbl(,"") + * class ::= @see java.lang.Class#forName(java.lang.String) + * path ::= @see org.apache.hadoop.fs.Path#Path(java.lang.String) + * } Reads expression from the mapred.join.expr property and + * user-supplied join types from mapred.join.define.<ident> + * types. Paths supplied to tbl are given as input paths to the + * InputFormat class listed. + * + * @see #compose(java.lang.String, java.lang.Class, java.lang.String...) + */ + public void setFormat(BSPJob job) throws IOException { + addDefaults(); + addUserIdentifiers(job); + root = Parser.parse(job.get("bsp.join.expr"), job); + } + + /** + * Adds the default set of identifiers to the parser. + */ + protected void addDefaults() { + try { + Parser.CNode.addIdentifier("inner", InnerJoinRecordReader.class); + Parser.CNode.addIdentifier("outer", OuterJoinRecordReader.class); + Parser.CNode.addIdentifier("override", OverrideRecordReader.class); + Parser.WNode.addIdentifier("tbl", WrappedRecordReader.class); + } catch (NoSuchMethodException e) { + throw new RuntimeException("FATAL: Failed to init defaults", e); + } + } + + /** + * Inform the parser of user-defined types. + */ + private void addUserIdentifiers(BSPJob job) throws IOException { + Pattern x = Pattern.compile("^bsp\\.join\\.define\\.(\\w+)$"); + for (Map.Entry kv : job.getConfiguration()) { + Matcher m = x.matcher(kv.getKey()); + if (m.matches()) { + try { + Parser.CNode.addIdentifier(m.group(1), job.getConfiguration() + .getClass(m.group(0), null, ComposableRecordReader.class)); + } catch (NoSuchMethodException e) { + throw (IOException) new IOException("Invalid define for " + + m.group(1)).initCause(e); + } + } + } + } + + /** + * Build a CompositeInputSplit from the child InputFormats by assigning the + * ith split from each child to the ith composite split. + */ + public InputSplit[] getSplits(BSPJob job, int numBspTask) throws IOException { + setFormat(job); + job.getConfiguration().setLong("bsp.min.split.size", Long.MAX_VALUE); + return root.getSplits(job, numBspTask); + } + + /** + * Construct a CompositeRecordReader for the children of this InputFormat as + * defined in the init expression. The outermost join need only be composable, + * not necessarily a composite. Mandating TupleWritable isn't strictly + * correct. + */ + @SuppressWarnings("unchecked") + // child types unknown + public ComposableRecordReader getRecordReader( + InputSplit split, BSPJob job) throws IOException { + setFormat(job); + return root.getRecordReader(split, job); + } + + /** + * Convenience method for constructing composite formats. Given InputFormat + * class (inf), path (p) return: {@code tbl(, + *

) } + */ + public static String compose(Class inf, String path) { + return compose(inf.getName().intern(), path, new StringBuffer()).toString(); + } + + /** + * Convenience method for constructing composite formats. Given operation + * (op), Object class (inf), set of paths (p) return: + * {@code (tbl(,),tbl(,),...,tbl(,)) } + */ + public static String compose(String op, Class inf, + String... path) { + final String infname = inf.getName(); + StringBuffer ret = new StringBuffer(op + '('); + for (String p : path) { + compose(infname, p, ret); + ret.append(','); + } + ret.setCharAt(ret.length() - 1, ')'); + return ret.toString(); + } + + /** + * Convenience method for constructing composite formats. Given operation + * (op), Object class (inf), set of paths (p) return: + * {@code (tbl(,),tbl(,),...,tbl(,)) } + */ + public static String compose(String op, Class inf, + Path... path) { + ArrayList tmp = new ArrayList(path.length); + for (Path p : path) { + tmp.add(p.toString()); + } + return compose(op, inf, tmp.toArray(new String[0])); + } + + private static StringBuffer compose(String inf, String path, StringBuffer sb) { + sb.append("tbl(" + inf + ",\""); + sb.append(path); + sb.append("\")"); + return sb; + } +} Index: core/src/main/java/org/apache/hama/bsp/join/Parser.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/Parser.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/Parser.java (revision 0) @@ -0,0 +1,497 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp.join; + +import java.io.CharArrayReader; +import java.io.IOException; +import java.io.StreamTokenizer; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Stack; + +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.FileInputFormat; +import org.apache.hama.bsp.InputFormat; +import org.apache.hama.bsp.InputSplit; +import org.apache.hama.bsp.RecordReader; + +/** + * Very simple shift-reduce parser for join expressions. + * + * This should be sufficient for the user extension permitted now, but ought to + * be replaced with a parser generator if more complex grammars are supported. + * In particular, this "shift-reduce" parser has no states. Each set + * of formals requires a different internal node type, which is responsible for + * interpreting the list of tokens it receives. This is sufficient for the + * current grammar, but it has several annoying properties that might inhibit + * extension. In particular, parenthesis are always function calls; an algebraic + * or filter grammar would not only require a node type, but must also work + * around the internals of this parser. + * + * For most other cases, adding classes to the hierarchy- particularly by + * extending JoinRecordReader and MultiFilterRecordReader- is fairly + * straightforward. One need only override the relevant method(s) (usually only + * {@link CompositeRecordReader#combine}) and include a property to map its + * value to an identifier in the parser. + */ +public class Parser { + public enum TType { + CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, + } + + /** + * Tagged-union type for tokens from the join expression. + * + * @see Parser.TType + */ + public static class Token { + + private TType type; + + Token(TType type) { + this.type = type; + } + + public TType getType() { + return type; + } + + public Node getNode() throws IOException { + throw new IOException("Expected nodetype"); + } + + public double getNum() throws IOException { + throw new IOException("Expected numtype"); + } + + public String getStr() throws IOException { + throw new IOException("Expected strtype"); + } + } + + public static class NumToken extends Token { + private double num; + + public NumToken(double num) { + super(TType.NUM); + this.num = num; + } + + public double getNum() { + return num; + } + } + + public static class NodeToken extends Token { + private Node node; + + NodeToken(Node node) { + super(TType.CIF); + this.node = node; + } + + public Node getNode() { + return node; + } + } + + public static class StrToken extends Token { + private String str; + + public StrToken(TType type, String str) { + super(type); + this.str = str; + } + + public String getStr() { + return str; + } + } + + /** + * Simple lexer wrapping a StreamTokenizer. This encapsulates the creation of + * tagged-union Tokens and initializes the SteamTokenizer. + */ + private static class Lexer { + + private StreamTokenizer tok; + + Lexer(String s) { + tok = new StreamTokenizer(new CharArrayReader(s.toCharArray())); + tok.quoteChar('"'); + tok.parseNumbers(); + tok.ordinaryChar(','); + tok.ordinaryChar('('); + tok.ordinaryChar(')'); + tok.wordChars('$', '$'); + tok.wordChars('_', '_'); + } + + Token next() throws IOException { + int type = tok.nextToken(); + switch (type) { + case StreamTokenizer.TT_EOF: + case StreamTokenizer.TT_EOL: + return null; + case StreamTokenizer.TT_NUMBER: + return new NumToken(tok.nval); + case StreamTokenizer.TT_WORD: + return new StrToken(TType.IDENT, tok.sval); + case '"': + return new StrToken(TType.QUOT, tok.sval); + default: + switch (type) { + case ',': + return new Token(TType.COMMA); + case '(': + return new Token(TType.LPAREN); + case ')': + return new Token(TType.RPAREN); + default: + throw new IOException("Unexpected: " + type); + } + } + } + } + + public abstract static class Node implements ComposableInputFormat { + /** + * Return the node type registered for the particular identifier. By + * default, this is a CNode for any composite node and a WNode for + * "wrapped" nodes. User nodes will likely be composite nodes. + * + * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class, + * java.lang.Class) + * @see CompositeInputFormat#setFormat(org.apache.hama.bsp.BSPJob) + */ + static Node forIdent(String ident) throws IOException { + try { + if (!nodeCstrMap.containsKey(ident)) { + throw new IOException("No nodetype for " + ident); + } + return nodeCstrMap.get(ident).newInstance(ident); + } catch (IllegalAccessException e) { + throw (IOException) new IOException().initCause(e); + } catch (InstantiationException e) { + throw (IOException) new IOException().initCause(e); + } catch (InvocationTargetException e) { + throw (IOException) new IOException().initCause(e); + } + } + + private static final Class[] ncstrSig = { String.class }; + private static final Map> nodeCstrMap = new HashMap>(); + protected static final Map> rrCstrMap = new HashMap>(); + + /** + * For a given identifier, add a mapping to the nodetype for the parse tree + * and to the ComposableRecordReader to be created, including the formals + * required to invoke the constructor. The nodetype and constructor + * signature should be filled in from the child node. + */ + protected static void addIdentifier(String ident, Class[] mcstrSig, + Class nodetype, + Class cl) + throws NoSuchMethodException { + Constructor ncstr = nodetype + .getDeclaredConstructor(ncstrSig); + ncstr.setAccessible(true); + nodeCstrMap.put(ident, ncstr); + Constructor mcstr = cl + .getDeclaredConstructor(mcstrSig); + mcstr.setAccessible(true); + rrCstrMap.put(ident, mcstr); + } + + // inst + protected int id = -1; + protected String ident; + protected Class cmpcl; + + protected Node(String ident) { + this.ident = ident; + } + + protected void setID(int id) { + this.id = id; + } + + protected void setKeyComparator(Class cmpcl) { + this.cmpcl = cmpcl; + } + + abstract void parse(List args, BSPJob job) throws IOException; + } + + /** + * Nodetype in the parse tree for "wrapped" InputFormats. + */ + static class WNode extends Node { + private static final Class[] cstrSig = { Integer.TYPE, + RecordReader.class, Class.class }; + + static void addIdentifier(String ident, + Class cl) + throws NoSuchMethodException { + Node.addIdentifier(ident, cstrSig, WNode.class, cl); + } + + private String indir; + private InputFormat inf; + + public WNode(String ident) { + super(ident); + } + + /** + * Let the first actual define the InputFormat and the second define the + * bsp.input.dir property. + */ + public void parse(List ll, BSPJob job) throws IOException { + StringBuilder sb = new StringBuilder(); + Iterator i = ll.iterator(); + while (i.hasNext()) { + Token t = i.next(); + if (TType.COMMA.equals(t.getType())) { + try { + inf = (InputFormat) ReflectionUtils.newInstance(job + .getConfiguration().getClassByName(sb.toString()), job + .getConfiguration()); + } catch (ClassNotFoundException e) { + throw (IOException) new IOException().initCause(e); + } catch (IllegalArgumentException e) { + throw (IOException) new IOException().initCause(e); + } + break; + } + sb.append(t.getStr()); + } + if (!i.hasNext()) { + throw new IOException("Parse error"); + } + Token t = i.next(); + if (!TType.QUOT.equals(t.getType())) { + throw new IOException("Expected quoted string"); + } + indir = t.getStr(); + // no check for ll.isEmpty() to permit extension + } + + private BSPJob getConf(BSPJob job) throws IOException { + BSPJob bspJob = new BSPJob((HamaConfiguration) job.getConfiguration()); + FileInputFormat.setInputPaths(bspJob, indir); + return bspJob; + } + + public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException { + return inf.getSplits(getConf(job), numSplits); + } + + public ComposableRecordReader getRecordReader(InputSplit split, BSPJob job) + throws IOException { + try { + if (!rrCstrMap.containsKey(ident)) { + throw new IOException("No RecordReader for " + ident); + } + return rrCstrMap.get(ident).newInstance(id, + inf.getRecordReader(split, getConf(job)), cmpcl); + } catch (IllegalAccessException e) { + throw (IOException) new IOException().initCause(e); + } catch (InstantiationException e) { + throw (IOException) new IOException().initCause(e); + } catch (InvocationTargetException e) { + throw (IOException) new IOException().initCause(e); + } + } + + public String toString() { + return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")"; + } + } + + /** + * Internal nodetype for "composite" InputFormats. + */ + static class CNode extends Node { + + private static final Class[] cstrSig = { Integer.TYPE, BSPJob.class, + Integer.TYPE, Class.class }; + + static void addIdentifier(String ident, + Class cl) + throws NoSuchMethodException { + Node.addIdentifier(ident, cstrSig, CNode.class, cl); + } + + // inst + private ArrayList kids = new ArrayList(); + + public CNode(String ident) { + super(ident); + } + + public void setKeyComparator(Class cmpcl) { + super.setKeyComparator(cmpcl); + for (Node n : kids) { + n.setKeyComparator(cmpcl); + } + } + + /** + * Combine InputSplits from child InputFormats into a + * {@link CompositeInputSplit}. + */ + public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException { + InputSplit[][] splits = new InputSplit[kids.size()][]; + for (int i = 0; i < kids.size(); ++i) { + final InputSplit[] tmp = kids.get(i).getSplits(job, numSplits); + if (null == tmp) { + throw new IOException("Error gathering splits from child RReader"); + } + if (i > 0 && splits[i - 1].length != tmp.length) { + throw new IOException("Inconsistent split cardinality from child " + + i + " (" + splits[i - 1].length + "/" + tmp.length + ")"); + } + splits[i] = tmp; + } + final int size = splits[0].length; + CompositeInputSplit[] ret = new CompositeInputSplit[size]; + for (int i = 0; i < size; ++i) { + ret[i] = new CompositeInputSplit(splits.length); + for (int j = 0; j < splits.length; ++j) { + ret[i].add(splits[j][i]); + } + } + return ret; + } + + @SuppressWarnings("unchecked") + // child types unknowable + public ComposableRecordReader getRecordReader(InputSplit split, BSPJob job) + throws IOException { + if (!(split instanceof CompositeInputSplit)) { + throw new IOException("Invalid split type:" + + split.getClass().getName()); + } + final CompositeInputSplit spl = (CompositeInputSplit) split; + final int capacity = kids.size(); + CompositeRecordReader ret = null; + try { + if (!rrCstrMap.containsKey(ident)) { + throw new IOException("No RecordReader for " + ident); + } + ret = (CompositeRecordReader) rrCstrMap.get(ident).newInstance(id, job, + capacity, cmpcl); + } catch (IllegalAccessException e) { + throw (IOException) new IOException().initCause(e); + } catch (InstantiationException e) { + throw (IOException) new IOException().initCause(e); + } catch (InvocationTargetException e) { + throw (IOException) new IOException().initCause(e); + } + for (int i = 0; i < capacity; ++i) { + ret.add(kids.get(i).getRecordReader(spl.get(i), job)); + } + return (ComposableRecordReader) ret; + } + + /** + * Parse a list of comma-separated nodes. + */ + public void parse(List args, BSPJob job) throws IOException { + ListIterator i = args.listIterator(); + while (i.hasNext()) { + Token t = i.next(); + t.getNode().setID(i.previousIndex() >> 1); + kids.add(t.getNode()); + if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) { + throw new IOException("Expected ','"); + } + } + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(ident + "("); + for (Node n : kids) { + sb.append(n.toString() + ","); + } + sb.setCharAt(sb.length() - 1, ')'); + return sb.toString(); + } + } + + private static Token reduce(Stack st, BSPJob job) throws IOException { + LinkedList args = new LinkedList(); + while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) { + args.addFirst(st.pop()); + } + if (st.isEmpty()) { + throw new IOException("Unmatched ')'"); + } + st.pop(); + if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) { + throw new IOException("Identifier expected"); + } + Node n = Node.forIdent(st.pop().getStr()); + n.parse(args, job); + return new NodeToken(n); + } + + /** + * Given an expression and an optional comparator, build a tree of + * InputFormats using the comparator to sort keys. + */ + static Node parse(String expr, BSPJob job) throws IOException { + if (null == expr) { + throw new IOException("Expression is null"); + } + Class cmpcl = job.getConfiguration() + .getClass("bsp.join.keycomparator", null, WritableComparator.class); + Lexer lex = new Lexer(expr); + Stack st = new Stack(); + Token tok; + while ((tok = lex.next()) != null) { + if (TType.RPAREN.equals(tok.getType())) { + st.push(reduce(st, job)); + } else { + st.push(tok); + } + } + if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) { + Node ret = st.pop().getNode(); + if (cmpcl != null) { + ret.setKeyComparator(cmpcl); + } + return ret; + } + throw new IOException("Missing ')'"); + } + +} Index: core/src/main/java/org/apache/hama/bsp/join/CompositeInputSplit.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/CompositeInputSplit.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/CompositeInputSplit.java (revision 0) @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp.join; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashSet; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.bsp.InputSplit; + +/** + * This InputSplit contains a set of child InputSplits. Any InputSplit inserted + * into this collection must have a public default constructor. + */ +public class CompositeInputSplit implements InputSplit { + + private int fill = 0; + private long totsize = 0L; + private InputSplit[] splits; + + public CompositeInputSplit() { + } + + public CompositeInputSplit(int capacity) { + splits = new InputSplit[capacity]; + } + + /** + * Add an InputSplit to this collection. + * + * @throws IOException If capacity was not specified during construction or if + * capacity has been reached. + */ + public void add(InputSplit s) throws IOException { + if (null == splits) { + throw new IOException("Uninitialized InputSplit"); + } + if (fill == splits.length) { + throw new IOException("Too many splits"); + } + splits[fill++] = s; + totsize += s.getLength(); + } + + /** + * Get ith child InputSplit. + */ + public InputSplit get(int i) { + return splits[i]; + } + + /** + * Return the aggregate length of all child InputSplits currently added. + */ + public long getLength() throws IOException { + return totsize; + } + + /** + * Get the length of ith child InputSplit. + */ + public long getLength(int i) throws IOException { + return splits[i].getLength(); + } + + /** + * Collect a set of hosts from all child InputSplits. + */ + public String[] getLocations() throws IOException { + HashSet hosts = new HashSet(); + for (InputSplit s : splits) { + String[] hints = s.getLocations(); + if (hints != null && hints.length > 0) { + for (String host : hints) { + hosts.add(host); + } + } + } + return hosts.toArray(new String[hosts.size()]); + } + + /** + * getLocations from ith InputSplit. + */ + public String[] getLocation(int i) throws IOException { + return splits[i].getLocations(); + } + + /** + * Write splits in the following format. + * {@code + * ...... + * } + */ + public void write(DataOutput out) throws IOException { + WritableUtils.writeVInt(out, splits.length); + for (InputSplit s : splits) { + Text.writeString(out, s.getClass().getName()); + } + for (InputSplit s : splits) { + s.write(out); + } + } + + /** + * {@inheritDoc} + * + * @throws IOException If the child InputSplit cannot be read, typically for + * faliing access checks. + */ + @SuppressWarnings("unchecked") + // Generic array assignment + public void readFields(DataInput in) throws IOException { + int card = WritableUtils.readVInt(in); + if (splits == null || splits.length != card) { + splits = new InputSplit[card]; + } + Class[] cls = new Class[card]; + try { + for (int i = 0; i < card; ++i) { + cls[i] = Class.forName(Text.readString(in)) + .asSubclass(InputSplit.class); + } + for (int i = 0; i < card; ++i) { + splits[i] = ReflectionUtils.newInstance(cls[i], null); + splits[i].readFields(in); + } + } catch (ClassNotFoundException e) { + throw (IOException) new IOException("Failed split init").initCause(e); + } + } + +} Index: core/src/main/java/org/apache/hama/bsp/join/StreamBackedIterator.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/StreamBackedIterator.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/StreamBackedIterator.java (revision 0) @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.join; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * This class provides an implementation of ResetableIterator. This + * implementation uses a byte array to store elements added to it. + */ +public class StreamBackedIterator implements + ResetableIterator { + + private static class ReplayableByteInputStream extends ByteArrayInputStream { + public ReplayableByteInputStream(byte[] arr) { + super(arr); + } + + public void resetStream() { + mark = 0; + reset(); + } + } + + private ByteArrayOutputStream outbuf = new ByteArrayOutputStream(); + private DataOutputStream outfbuf = new DataOutputStream(outbuf); + private ReplayableByteInputStream inbuf; + private DataInputStream infbuf; + + public StreamBackedIterator() { + } + + public boolean hasNext() { + return infbuf != null && inbuf.available() > 0; + } + + public boolean next(X val) throws IOException { + if (hasNext()) { + inbuf.mark(0); + val.readFields(infbuf); + return true; + } + return false; + } + + public boolean replay(X val) throws IOException { + inbuf.reset(); + if (0 == inbuf.available()) + return false; + val.readFields(infbuf); + return true; + } + + public void reset() { + if (null != outfbuf) { + inbuf = new ReplayableByteInputStream(outbuf.toByteArray()); + infbuf = new DataInputStream(inbuf); + outfbuf = null; + } + inbuf.resetStream(); + } + + public void add(X item) throws IOException { + item.write(outfbuf); + } + + public void close() throws IOException { + if (null != infbuf) + infbuf.close(); + if (null != outfbuf) + outfbuf.close(); + } + + public void clear() { + if (null != inbuf) + inbuf.resetStream(); + outbuf.reset(); + outfbuf = new DataOutputStream(outbuf); + } +} Index: core/src/main/java/org/apache/hama/bsp/join/OuterJoinRecordReader.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/OuterJoinRecordReader.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/OuterJoinRecordReader.java (revision 0) @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp.join; + +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hama.bsp.BSPJob; + +/** + * Full outer join. + */ +public class OuterJoinRecordReader extends + JoinRecordReader { + + OuterJoinRecordReader(int id, BSPJob job, int capacity, + Class cmpcl) throws IOException { + super(id, job, capacity, cmpcl); + } + + /** + * Emit everything from the collector. + */ + protected boolean combine(Object[] srcs, TupleWritable dst) { + assert srcs.length == dst.size(); + return true; + } +} Index: core/src/main/java/org/apache/hama/bsp/join/OverrideRecordReader.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/OverrideRecordReader.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/OverrideRecordReader.java (revision 0) @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp.join; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.PriorityQueue; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hama.bsp.BSPJob; + +/** + * Prefer the "rightmost" data source for this key. For example, + * override(S1,S2,S3) will prefer values from S3 over S2, and values + * from S2 over S1 for all keys emitted from all sources. + */ +public class OverrideRecordReader + extends MultiFilterRecordReader { + + OverrideRecordReader(int id, BSPJob job, int capacity, + Class cmpcl) throws IOException { + super(id, job, capacity, cmpcl); + } + + /** + * Emit the value with the highest position in the tuple. + */ + @SuppressWarnings("unchecked") + // No static typeinfo on Tuples + protected V emit(TupleWritable dst) { + return (V) dst.iterator().next(); + } + + /** + * Instead of filling the JoinCollector with iterators from all data sources, + * fill only the rightmost for this key. This not only saves space by + * discarding the other sources, but it also emits the number of key-value + * pairs in the preferred RecordReader instead of repeating that stream n + * times, where n is the cardinality of the cross product of the discarded + * streams for the given key. + */ + protected void fillJoinCollector(K iterkey) throws IOException { + final PriorityQueue> q = getRecordReaderQueue(); + if (!q.isEmpty()) { + int highpos = -1; + ArrayList> list = new ArrayList>( + kids.length); + q.peek().key(iterkey); + final WritableComparator cmp = getComparator(); + while (0 == cmp.compare(q.peek().key(), iterkey)) { + ComposableRecordReader t = q.poll(); + if (-1 == highpos || list.get(highpos).id() < t.id()) { + highpos = list.size(); + } + list.add(t); + if (q.isEmpty()) + break; + } + ComposableRecordReader t = list.remove(highpos); + t.accept(jc, iterkey); + for (ComposableRecordReader rr : list) { + rr.skip(iterkey); + } + list.add(t); + for (ComposableRecordReader rr : list) { + if (rr.hasNext()) { + q.add(rr); + } + } + } + } + +} Index: core/src/main/java/org/apache/hama/bsp/join/ComposableRecordReader.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/join/ComposableRecordReader.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/join/ComposableRecordReader.java (revision 0) @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp.join; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hama.bsp.RecordReader; + +/** + * Additional operations required of a RecordReader to participate in a join. + */ +public interface ComposableRecordReader + extends RecordReader, Comparable> { + + /** + * Return the position in the collector this class occupies. + */ + int id(); + + /** + * Return the key this RecordReader would supply on a call to next(K,V) + */ + K key(); + + /** + * Clone the key at the head of this RecordReader into the object provided. + */ + void key(K key) throws IOException; + + /** + * Returns true if the stream is not empty, but provides no guarantee that a + * call to next(K,V) will succeed. + */ + boolean hasNext(); + + /** + * Skip key-value pairs with keys less than or equal to the key provided. + */ + void skip(K key) throws IOException; + + /** + * While key-value pairs from this RecordReader match the given key, register + * them with the JoinCollector provided. + */ + void accept(CompositeRecordReader.JoinCollector jc, K key) throws IOException; +} Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1545639) +++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -324,7 +324,8 @@ short replication = (short) job.getInt("bsp.submit.replication", 10); // only create the splits if we have an input - if (job.get("bsp.input.dir") != null) { + if ((job.get("bsp.input.dir") != null) + || (job.get("bsp.join.expr") != null)) { // Create the splits for the job LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));