Index: build-common.xml
===================================================================
--- build-common.xml (revision 894963)
+++ build-common.xml (working copy)
@@ -311,7 +311,7 @@
+ excludes="**/TestSerDe.class,**/*$*.class" />
Index: contrib/src/test/org/apache/hadoop/hive/contrib/mr/TestGenericMR.java
===================================================================
--- contrib/src/test/org/apache/hadoop/hive/contrib/mr/TestGenericMR.java (revision 0)
+++ contrib/src/test/org/apache/hadoop/hive/contrib/mr/TestGenericMR.java (revision 0)
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.contrib.mr;
+
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import junit.framework.TestCase;
+
+
+public final class TestGenericMR extends TestCase {
+ public void testReduceTooFar() throws Exception {
+ try {
+ new GenericMR().reduce(new StringReader("a\tb\tc"), new StringWriter(), new Reducer() {
+ public void reduce(String key, Iterator records, Output output) throws Exception {
+ while (true) {
+ records.next();
+ }
+ }
+ });
+ } catch (final NoSuchElementException nsee) {
+ // expected
+ return;
+ }
+
+ fail("Expected NoSuchElementException");
+ }
+
+ public void testEmptyMap() throws Exception {
+ final StringWriter out = new StringWriter();
+
+ new GenericMR().map(new StringReader(""), out, identityMapper());
+
+ assertEquals(0, out.toString().length());
+ }
+
+ public void testIdentityMap() throws Exception {
+ final String in = "a\tb\nc\td";
+ final StringWriter out = new StringWriter();
+
+ new GenericMR().map(new StringReader(in), out, identityMapper());
+
+ assertEquals(in + "\n", out.toString());
+ }
+
+ public void testKVSplitMap() throws Exception {
+ final String in = "k1=v1,k2=v2\nk1=v2,k2=v3";
+ final String expected = "k1\tv1\nk2\tv2\nk1\tv2\nk2\tv3\n";
+ final StringWriter out = new StringWriter();
+
+ new GenericMR().map(new StringReader(in), out, new Mapper() {
+ public void map(String[] record, Output output) throws Exception {
+ for (final String kvs : record[0].split(",")) {
+ final String[] kv = kvs.split("=");
+ output.collect(new String[] { kv[0], kv[1] });
+ }
+ }
+ });
+
+ assertEquals(expected, out.toString());
+ }
+
+ public void testIdentityReduce() throws Exception {
+ final String in = "a\tb\nc\td";
+ final StringWriter out = new StringWriter();
+
+ new GenericMR().reduce(new StringReader(in), out, identityReducer());
+
+ assertEquals(in + "\n", out.toString());
+ }
+
+ public void testWordCountReduce() throws Exception {
+ final String in = "hello\t1\nhello\t2\nokay\t4\nokay\t6\nokay\t2";
+ final StringWriter out = new StringWriter();
+
+ new GenericMR().reduce(new StringReader(in), out, new Reducer() {
+ @Override
+ public void reduce(String key, Iterator records, Output output) throws Exception {
+ int count = 0;
+
+ while (records.hasNext()) {
+ count += Integer.parseInt(records.next()[1]);
+ }
+
+ output.collect(new String[] { key, String.valueOf(count) });
+ }
+ });
+
+ final String expected = "hello\t3\nokay\t12\n";
+
+ assertEquals(expected, out.toString());
+ }
+
+ private Mapper identityMapper() {
+ return new Mapper() {
+ @Override
+ public void map(String[] record, Output output) throws Exception {
+ output.collect(record);
+ }
+ };
+ }
+
+ private Reducer identityReducer() {
+ return new Reducer() {
+ @Override
+ public void reduce(String key, Iterator records, Output output) throws Exception {
+ while (records.hasNext()) {
+ output.collect(records.next());
+ }
+ }
+ };
+ }
+}
Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/Output.java
===================================================================
--- contrib/src/java/org/apache/hadoop/hive/contrib/mr/Output.java (revision 0)
+++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/Output.java (revision 0)
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.contrib.mr;
+/**
+ * Collects output.
+ */
+public interface Output {
+ /**
+ * Add a row to the output.
+ *
+ * @param record
+ * @throws Exception
+ */
+ void collect(String[] record) throws Exception;
+}
Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/Mapper.java
===================================================================
--- contrib/src/java/org/apache/hadoop/hive/contrib/mr/Mapper.java (revision 0)
+++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/Mapper.java (revision 0)
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.contrib.mr;
+/**
+ * Mapper.
+ */
+public interface Mapper {
+ /**
+ * Maps a single row into an intermediate rows.
+ *
+ * @param record input record
+ * @param output collect mapped rows.
+ * @throws Exception on error
+ */
+ void map(String[] record, Output output) throws Exception;
+}
Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/Reducer.java
===================================================================
--- contrib/src/java/org/apache/hadoop/hive/contrib/mr/Reducer.java (revision 0)
+++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/Reducer.java (revision 0)
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.contrib.mr;
+
+import java.util.Iterator;
+
+/**
+ * Simple reducer interface.
+ */
+public interface Reducer {
+ /**
+ * Reduce.
+ *
+ * Note that it is assumed that the key is the first column. Additionally, the key
+ * will be repeated as the first column in the records[] array.
+ *
+ * @param key key (first column) for this set of records.
+ * @param records Iterator of records for this key. Note that the first column of record will also be the key.
+ * @param output
+ * @throws Exception
+ */
+ void reduce(String key, Iterator records, Output output) throws Exception;
+}
Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/GenericMR.java
===================================================================
--- contrib/src/java/org/apache/hadoop/hive/contrib/mr/GenericMR.java (revision 0)
+++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/GenericMR.java (revision 0)
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.contrib.mr;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * This class attempts to provide a simple framework for writing Hive map/reduce
+ * tasks in java.
+ *
+ * The main benefit is that it deals with grouping the keys together for reduce
+ * tasks.
+ *
+ * Additionally, it deals with all system io... and provides something closer to
+ * the hadoop m/r.
+ *
+ * As an example, here's the wordcount reduce:
+ *
+ * new GenericMR().reduce(System.in, System.out, new Reducer() { public void
+ * reduce(String key, Iterator records, Output output) throws
+ * Exception { int count = 0;
+ *
+ * while (records.hasNext()) { count += Integer.parseInt(records.next()[1]); }
+ *
+ * output.collect(new String[] { key, String.valueOf(count) }); } });
+ *
+ */
+public final class GenericMR {
+ public void map(final InputStream in, final OutputStream out, final Mapper mapper) throws Exception {
+ map(new InputStreamReader(in), new OutputStreamWriter(out), mapper);
+ }
+
+ public void map(final Reader in, final Writer out, final Mapper mapper) throws Exception {
+ handle(in, out, new RecordProcessor() {
+ @Override
+ public void processNext(RecordReader reader, Output output) throws Exception {
+ mapper.map(reader.next(), output);
+ }
+ });
+ }
+
+ public void reduce(final InputStream in, final OutputStream out, final Reducer reducer) throws Exception {
+ reduce(new InputStreamReader(in), new OutputStreamWriter(out), reducer);
+ }
+
+ public void reduce(final Reader in, final Writer out, final Reducer reducer) throws Exception {
+ handle(in, out, new RecordProcessor() {
+ @Override
+ public void processNext(RecordReader reader, Output output) throws Exception {
+ reducer.reduce(reader.peek()[0], new KeyRecordIterator(reader.peek()[0], reader), output);
+ }
+ });
+ }
+
+ private void handle(final Reader in, final Writer out, final RecordProcessor processor) throws Exception {
+ final RecordReader reader = new RecordReader(in);
+ final OutputStreamOutput output = new OutputStreamOutput(out);
+
+ try {
+ while (reader.hasNext()) {
+ processor.processNext(reader, output);
+ }
+ } finally {
+ try {
+ output.close();
+ } finally {
+ reader.close();
+ }
+ }
+ }
+
+ private static interface RecordProcessor {
+ void processNext(final RecordReader reader, final Output output) throws Exception;
+ }
+
+ private static final class KeyRecordIterator implements Iterator {
+ private final String key;
+ private final RecordReader reader;
+
+ private KeyRecordIterator(final String key, final RecordReader reader) {
+ this.key = key;
+ this.reader = reader;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return (this.reader.hasNext() && this.key.equals(this.reader.peek()[0]));
+ }
+
+ @Override
+ public String[] next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ return this.reader.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static final class RecordReader {
+ private final BufferedReader reader;
+ private String[] next;
+
+ private RecordReader(final InputStream in) {
+ this(new InputStreamReader(in));
+ }
+
+ private RecordReader(final Reader in) {
+ this.reader = new BufferedReader(in);
+ this.next = readNext();
+ }
+
+ private String[] next() {
+ final String[] ret = next;
+
+ this.next = readNext();
+
+ return ret;
+ }
+
+ private String[] readNext() {
+ try {
+ final String line = this.reader.readLine();
+ return (line == null ? null : line.split("\t"));
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean hasNext() {
+ return next != null;
+ }
+
+ private String[] peek() {
+ return next;
+ }
+
+ private void close() throws Exception {
+ this.reader.close();
+ }
+ }
+
+ private static final class OutputStreamOutput implements Output {
+ private final PrintWriter out;
+
+ private OutputStreamOutput(final OutputStream out) {
+ this(new OutputStreamWriter(out));
+ }
+
+ private OutputStreamOutput(final Writer out) {
+ this.out = new PrintWriter(out);
+ }
+
+ public void close() throws Exception {
+ out.close();
+ }
+
+ @Override
+ public void collect(String[] record) throws Exception {
+ out.println(StringUtils.join(record, "\t"));
+ }
+ }
+}