Index: build-common.xml =================================================================== --- build-common.xml (revision 894963) +++ build-common.xml (working copy) @@ -311,7 +311,7 @@ + excludes="**/TestSerDe.class,**/*$*.class" /> Index: contrib/src/test/org/apache/hadoop/hive/contrib/mr/TestGenericMR.java =================================================================== --- contrib/src/test/org/apache/hadoop/hive/contrib/mr/TestGenericMR.java (revision 0) +++ contrib/src/test/org/apache/hadoop/hive/contrib/mr/TestGenericMR.java (revision 0) @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.mr; + +import java.io.StringReader; +import java.io.StringWriter; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import junit.framework.TestCase; + + +public final class TestGenericMR extends TestCase { + public void testReduceTooFar() throws Exception { + try { + new GenericMR().reduce(new StringReader("a\tb\tc"), new StringWriter(), new Reducer() { + public void reduce(String key, Iterator records, Output output) throws Exception { + while (true) { + records.next(); + } + } + }); + } catch (final NoSuchElementException nsee) { + // expected + return; + } + + fail("Expected NoSuchElementException"); + } + + public void testEmptyMap() throws Exception { + final StringWriter out = new StringWriter(); + + new GenericMR().map(new StringReader(""), out, identityMapper()); + + assertEquals(0, out.toString().length()); + } + + public void testIdentityMap() throws Exception { + final String in = "a\tb\nc\td"; + final StringWriter out = new StringWriter(); + + new GenericMR().map(new StringReader(in), out, identityMapper()); + + assertEquals(in + "\n", out.toString()); + } + + public void testKVSplitMap() throws Exception { + final String in = "k1=v1,k2=v2\nk1=v2,k2=v3"; + final String expected = "k1\tv1\nk2\tv2\nk1\tv2\nk2\tv3\n"; + final StringWriter out = new StringWriter(); + + new GenericMR().map(new StringReader(in), out, new Mapper() { + public void map(String[] record, Output output) throws Exception { + for (final String kvs : record[0].split(",")) { + final String[] kv = kvs.split("="); + output.collect(new String[] { kv[0], kv[1] }); + } + } + }); + + assertEquals(expected, out.toString()); + } + + public void testIdentityReduce() throws Exception { + final String in = "a\tb\nc\td"; + final StringWriter out = new StringWriter(); + + new GenericMR().reduce(new StringReader(in), out, identityReducer()); + + assertEquals(in + "\n", out.toString()); + } + + public void testWordCountReduce() throws Exception { + final String in = "hello\t1\nhello\t2\nokay\t4\nokay\t6\nokay\t2"; + final StringWriter out = new StringWriter(); + + new GenericMR().reduce(new StringReader(in), out, new Reducer() { + @Override + public void reduce(String key, Iterator records, Output output) throws Exception { + int count = 0; + + while (records.hasNext()) { + count += Integer.parseInt(records.next()[1]); + } + + output.collect(new String[] { key, String.valueOf(count) }); + } + }); + + final String expected = "hello\t3\nokay\t12\n"; + + assertEquals(expected, out.toString()); + } + + private Mapper identityMapper() { + return new Mapper() { + @Override + public void map(String[] record, Output output) throws Exception { + output.collect(record); + } + }; + } + + private Reducer identityReducer() { + return new Reducer() { + @Override + public void reduce(String key, Iterator records, Output output) throws Exception { + while (records.hasNext()) { + output.collect(records.next()); + } + } + }; + } +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/Output.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/mr/Output.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/Output.java (revision 0) @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.mr; +/** + * Collects output. + */ +public interface Output { + /** + * Add a row to the output. + * + * @param record + * @throws Exception + */ + void collect(String[] record) throws Exception; +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/Mapper.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/mr/Mapper.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/Mapper.java (revision 0) @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.mr; +/** + * Mapper. + */ +public interface Mapper { + /** + * Maps a single row into an intermediate rows. + * + * @param record input record + * @param output collect mapped rows. + * @throws Exception on error + */ + void map(String[] record, Output output) throws Exception; +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/Reducer.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/mr/Reducer.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/Reducer.java (revision 0) @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.mr; + +import java.util.Iterator; + +/** + * Simple reducer interface. + */ +public interface Reducer { + /** + * Reduce. + * + * Note that it is assumed that the key is the first column. Additionally, the key + * will be repeated as the first column in the records[] array. + * + * @param key key (first column) for this set of records. + * @param records Iterator of records for this key. Note that the first column of record will also be the key. + * @param output + * @throws Exception + */ + void reduce(String key, Iterator records, Output output) throws Exception; +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/GenericMR.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/mr/GenericMR.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/GenericMR.java (revision 0) @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.mr; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.Reader; +import java.io.Writer; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.commons.lang.StringUtils; + +/** + * This class attempts to provide a simple framework for writing Hive map/reduce + * tasks in java. + * + * The main benefit is that it deals with grouping the keys together for reduce + * tasks. + * + * Additionally, it deals with all system io... and provides something closer to + * the hadoop m/r. + * + * As an example, here's the wordcount reduce: + * + * new GenericMR().reduce(System.in, System.out, new Reducer() { public void + * reduce(String key, Iterator records, Output output) throws + * Exception { int count = 0; + * + * while (records.hasNext()) { count += Integer.parseInt(records.next()[1]); } + * + * output.collect(new String[] { key, String.valueOf(count) }); } }); + * + */ +public final class GenericMR { + public void map(final InputStream in, final OutputStream out, final Mapper mapper) throws Exception { + map(new InputStreamReader(in), new OutputStreamWriter(out), mapper); + } + + public void map(final Reader in, final Writer out, final Mapper mapper) throws Exception { + handle(in, out, new RecordProcessor() { + @Override + public void processNext(RecordReader reader, Output output) throws Exception { + mapper.map(reader.next(), output); + } + }); + } + + public void reduce(final InputStream in, final OutputStream out, final Reducer reducer) throws Exception { + reduce(new InputStreamReader(in), new OutputStreamWriter(out), reducer); + } + + public void reduce(final Reader in, final Writer out, final Reducer reducer) throws Exception { + handle(in, out, new RecordProcessor() { + @Override + public void processNext(RecordReader reader, Output output) throws Exception { + reducer.reduce(reader.peek()[0], new KeyRecordIterator(reader.peek()[0], reader), output); + } + }); + } + + private void handle(final Reader in, final Writer out, final RecordProcessor processor) throws Exception { + final RecordReader reader = new RecordReader(in); + final OutputStreamOutput output = new OutputStreamOutput(out); + + try { + while (reader.hasNext()) { + processor.processNext(reader, output); + } + } finally { + try { + output.close(); + } finally { + reader.close(); + } + } + } + + private static interface RecordProcessor { + void processNext(final RecordReader reader, final Output output) throws Exception; + } + + private static final class KeyRecordIterator implements Iterator { + private final String key; + private final RecordReader reader; + + private KeyRecordIterator(final String key, final RecordReader reader) { + this.key = key; + this.reader = reader; + } + + @Override + public boolean hasNext() { + return (this.reader.hasNext() && this.key.equals(this.reader.peek()[0])); + } + + @Override + public String[] next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + return this.reader.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + private static final class RecordReader { + private final BufferedReader reader; + private String[] next; + + private RecordReader(final InputStream in) { + this(new InputStreamReader(in)); + } + + private RecordReader(final Reader in) { + this.reader = new BufferedReader(in); + this.next = readNext(); + } + + private String[] next() { + final String[] ret = next; + + this.next = readNext(); + + return ret; + } + + private String[] readNext() { + try { + final String line = this.reader.readLine(); + return (line == null ? null : line.split("\t")); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private boolean hasNext() { + return next != null; + } + + private String[] peek() { + return next; + } + + private void close() throws Exception { + this.reader.close(); + } + } + + private static final class OutputStreamOutput implements Output { + private final PrintWriter out; + + private OutputStreamOutput(final OutputStream out) { + this(new OutputStreamWriter(out)); + } + + private OutputStreamOutput(final Writer out) { + this.out = new PrintWriter(out); + } + + public void close() throws Exception { + out.close(); + } + + @Override + public void collect(String[] record) throws Exception { + out.println(StringUtils.join(record, "\t")); + } + } +}