Index: build-common.xml =================================================================== --- build-common.xml (revision 896319) +++ build-common.xml (working copy) @@ -311,7 +311,7 @@ + excludes="**/TestSerDe.class,**/*$*.class" /> Index: contrib/src/test/results/clientpositive/java_mr_example.q.out =================================================================== --- contrib/src/test/results/clientpositive/java_mr_example.q.out (revision 0) +++ contrib/src/test/results/clientpositive/java_mr_example.q.out (revision 0) @@ -0,0 +1,333 @@ +PREHOOK: query: FROM ( + FROM src + MAP value, key + USING 'java -cp ../build/contrib/hive_contrib.jar org.apache.hadoop.hive.contrib.mr.example.IdentityMapper' + AS k, v + CLUSTER BY k) map_output + REDUCE k, v + USING 'java -cp ../build/contrib/hive_contrib.jar org.apache.hadoop.hive.contrib.mr.example.WordCountReduce' + AS k, v +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/opt/h3/hive-trunk/build/ql/tmp/1438670936/10000 +POSTHOOK: query: FROM ( + FROM src + MAP value, key + USING 'java -cp ../build/contrib/hive_contrib.jar org.apache.hadoop.hive.contrib.mr.example.IdentityMapper' + AS k, v + CLUSTER BY k) map_output + REDUCE k, v + USING 'java -cp ../build/contrib/hive_contrib.jar org.apache.hadoop.hive.contrib.mr.example.WordCountReduce' + AS k, v +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/opt/h3/hive-trunk/build/ql/tmp/1438670936/10000 +val_0 0 +val_10 10 +val_100 200 +val_103 206 +val_104 208 +val_105 105 +val_11 11 +val_111 111 +val_113 226 +val_114 114 +val_116 116 +val_118 236 +val_119 357 +val_12 24 +val_120 240 +val_125 250 +val_126 126 +val_128 384 +val_129 258 +val_131 131 +val_133 133 +val_134 268 +val_136 136 +val_137 274 +val_138 552 +val_143 143 +val_145 145 +val_146 292 +val_149 298 +val_15 30 +val_150 150 +val_152 304 +val_153 153 +val_155 155 +val_156 156 +val_157 157 +val_158 158 +val_160 160 +val_162 162 +val_163 163 +val_164 328 +val_165 330 +val_166 166 +val_167 501 +val_168 168 +val_169 676 +val_17 17 +val_170 170 +val_172 344 +val_174 348 +val_175 350 +val_176 352 +val_177 177 +val_178 178 +val_179 358 +val_18 36 +val_180 180 +val_181 181 +val_183 183 +val_186 186 +val_187 561 +val_189 189 +val_19 19 +val_190 190 +val_191 382 +val_192 192 +val_193 579 +val_194 194 +val_195 390 +val_196 196 +val_197 394 +val_199 597 +val_2 2 +val_20 20 +val_200 400 +val_201 201 +val_202 202 +val_203 406 +val_205 410 +val_207 414 +val_208 624 +val_209 418 +val_213 426 +val_214 214 +val_216 432 +val_217 434 +val_218 218 +val_219 438 +val_221 442 +val_222 222 +val_223 446 +val_224 448 +val_226 226 +val_228 228 +val_229 458 +val_230 1150 +val_233 466 +val_235 235 +val_237 474 +val_238 476 +val_239 478 +val_24 48 +val_241 241 +val_242 484 +val_244 244 +val_247 247 +val_248 248 +val_249 249 +val_252 252 +val_255 510 +val_256 512 +val_257 257 +val_258 258 +val_26 52 +val_260 260 +val_262 262 +val_263 263 +val_265 530 +val_266 266 +val_27 27 +val_272 544 +val_273 819 +val_274 274 +val_275 275 +val_277 1108 +val_278 556 +val_28 28 +val_280 560 +val_281 562 +val_282 564 +val_283 283 +val_284 284 +val_285 285 +val_286 286 +val_287 287 +val_288 576 +val_289 289 +val_291 291 +val_292 292 +val_296 296 +val_298 894 +val_30 30 +val_302 302 +val_305 305 +val_306 306 +val_307 614 +val_308 308 +val_309 618 +val_310 310 +val_311 933 +val_315 315 +val_316 948 +val_317 634 +val_318 954 +val_321 642 +val_322 644 +val_323 323 +val_325 650 +val_327 981 +val_33 33 +val_331 662 +val_332 332 +val_333 666 +val_335 335 +val_336 336 +val_338 338 +val_339 339 +val_34 34 +val_341 341 +val_342 684 +val_344 688 +val_345 345 +val_348 1740 +val_35 105 +val_351 351 +val_353 706 +val_356 356 +val_360 360 +val_362 362 +val_364 364 +val_365 365 +val_366 366 +val_367 734 +val_368 368 +val_369 1107 +val_37 74 +val_373 373 +val_374 374 +val_375 375 +val_377 377 +val_378 378 +val_379 379 +val_382 764 +val_384 1152 +val_386 386 +val_389 389 +val_392 392 +val_393 393 +val_394 394 +val_395 790 +val_396 1188 +val_397 794 +val_399 798 +val_4 4 +val_400 400 +val_401 2005 +val_402 402 +val_403 1209 +val_404 808 +val_406 1624 +val_407 407 +val_409 1227 +val_41 41 +val_411 411 +val_413 826 +val_414 828 +val_417 1251 +val_418 418 +val_419 419 +val_42 84 +val_421 421 +val_424 848 +val_427 427 +val_429 858 +val_43 43 +val_430 1290 +val_431 1293 +val_432 432 +val_435 435 +val_436 436 +val_437 437 +val_438 1314 +val_439 878 +val_44 44 +val_443 443 +val_444 444 +val_446 446 +val_448 448 +val_449 449 +val_452 452 +val_453 453 +val_454 1362 +val_455 455 +val_457 457 +val_458 916 +val_459 918 +val_460 460 +val_462 924 +val_463 926 +val_466 1398 +val_467 467 +val_468 1872 +val_469 2345 +val_47 47 +val_470 470 +val_472 472 +val_475 475 +val_477 477 +val_478 956 +val_479 479 +val_480 1440 +val_481 481 +val_482 482 +val_483 483 +val_484 484 +val_485 485 +val_487 487 +val_489 1956 +val_490 490 +val_491 491 +val_492 984 +val_493 493 +val_494 494 +val_495 495 +val_496 496 +val_497 497 +val_498 1494 +val_5 15 +val_51 102 +val_53 53 +val_54 54 +val_57 57 +val_58 116 +val_64 64 +val_65 65 +val_66 66 +val_67 134 +val_69 69 +val_70 210 +val_72 144 +val_74 74 +val_76 152 +val_77 77 +val_78 78 +val_8 8 +val_80 80 +val_82 82 +val_83 166 +val_84 168 +val_85 85 +val_86 86 +val_87 87 +val_9 9 +val_90 270 +val_92 92 +val_95 190 +val_96 96 +val_97 194 +val_98 196 Index: contrib/src/test/org/apache/hadoop/hive/contrib/mr/TestGenericMR.java =================================================================== --- contrib/src/test/org/apache/hadoop/hive/contrib/mr/TestGenericMR.java (revision 0) +++ contrib/src/test/org/apache/hadoop/hive/contrib/mr/TestGenericMR.java (revision 0) @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.mr; + +import java.io.StringReader; +import java.io.StringWriter; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import junit.framework.TestCase; + + +public final class TestGenericMR extends TestCase { + public void testReduceTooFar() throws Exception { + try { + new GenericMR().reduce(new StringReader("a\tb\tc"), new StringWriter(), new Reducer() { + public void reduce(String key, Iterator records, Output output) throws Exception { + while (true) { + records.next(); + } + } + }); + } catch (final NoSuchElementException nsee) { + // expected + return; + } + + fail("Expected NoSuchElementException"); + } + + public void testEmptyMap() throws Exception { + final StringWriter out = new StringWriter(); + + new GenericMR().map(new StringReader(""), out, identityMapper()); + + assertEquals(0, out.toString().length()); + } + + public void testIdentityMap() throws Exception { + final String in = "a\tb\nc\td"; + final StringWriter out = new StringWriter(); + + new GenericMR().map(new StringReader(in), out, identityMapper()); + + assertEquals(in + "\n", out.toString()); + } + + public void testKVSplitMap() throws Exception { + final String in = "k1=v1,k2=v2\nk1=v2,k2=v3"; + final String expected = "k1\tv1\nk2\tv2\nk1\tv2\nk2\tv3\n"; + final StringWriter out = new StringWriter(); + + new GenericMR().map(new StringReader(in), out, new Mapper() { + public void map(String[] record, Output output) throws Exception { + for (final String kvs : record[0].split(",")) { + final String[] kv = kvs.split("="); + output.collect(new String[] { kv[0], kv[1] }); + } + } + }); + + assertEquals(expected, out.toString()); + } + + public void testIdentityReduce() throws Exception { + final String in = "a\tb\nc\td"; + final StringWriter out = new StringWriter(); + + new GenericMR().reduce(new StringReader(in), out, identityReducer()); + + assertEquals(in + "\n", out.toString()); + } + + public void testWordCountReduce() throws Exception { + final String in = "hello\t1\nhello\t2\nokay\t4\nokay\t6\nokay\t2"; + final StringWriter out = new StringWriter(); + + new GenericMR().reduce(new StringReader(in), out, new Reducer() { + @Override + public void reduce(String key, Iterator records, Output output) throws Exception { + int count = 0; + + while (records.hasNext()) { + count += Integer.parseInt(records.next()[1]); + } + + output.collect(new String[] { key, String.valueOf(count) }); + } + }); + + final String expected = "hello\t3\nokay\t12\n"; + + assertEquals(expected, out.toString()); + } + + private Mapper identityMapper() { + return new Mapper() { + @Override + public void map(String[] record, Output output) throws Exception { + output.collect(record); + } + }; + } + + private Reducer identityReducer() { + return new Reducer() { + @Override + public void reduce(String key, Iterator records, Output output) throws Exception { + while (records.hasNext()) { + output.collect(records.next()); + } + } + }; + } +} Index: contrib/src/test/queries/clientpositive/java_mr_example.q =================================================================== --- contrib/src/test/queries/clientpositive/java_mr_example.q (revision 0) +++ contrib/src/test/queries/clientpositive/java_mr_example.q (revision 0) @@ -0,0 +1,10 @@ +FROM ( + FROM src + MAP value, key + USING 'java -cp ../build/contrib/hive_contrib.jar org.apache.hadoop.hive.contrib.mr.example.IdentityMapper' + AS k, v + CLUSTER BY k) map_output + REDUCE k, v + USING 'java -cp ../build/contrib/hive_contrib.jar org.apache.hadoop.hive.contrib.mr.example.WordCountReduce' + AS k, v +; \ No newline at end of file Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/Output.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/mr/Output.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/Output.java (revision 0) @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.mr; +/** + * Collects output. + * + * It's the responsibility of the caller to ensure the output + * is in the correct format (contains the correct number of columns, etc.) + */ +public interface Output { + /** + * Add a row to the output. + * + * @param record + * @throws Exception + */ + void collect(String[] record) throws Exception; +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/Mapper.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/mr/Mapper.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/Mapper.java (revision 0) @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.mr; +/** + * Mapper. + */ +public interface Mapper { + /** + * Maps a single row into an intermediate rows. + * + * @param record input record + * @param output collect mapped rows. + * @throws Exception on error + */ + void map(String[] record, Output output) throws Exception; +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/Reducer.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/mr/Reducer.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/Reducer.java (revision 0) @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.mr; + +import java.util.Iterator; + +/** + * Simple reducer interface. + */ +public interface Reducer { + /** + * Reduce. + * + * Note that it is assumed that the key is the first column. Additionally, the key + * will be repeated as the first column in the records[] array. + * + * @param key key (first column) for this set of records. + * @param records Iterator of records for this key. Note that the first column of record will also be the key. + * @param output + * @throws Exception + */ + void reduce(String key, Iterator records, Output output) throws Exception; +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/example/WordCountReduce.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/mr/example/WordCountReduce.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/example/WordCountReduce.java (revision 0) @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.mr.example; + +import java.util.Iterator; + +import org.apache.hadoop.hive.contrib.mr.GenericMR; +import org.apache.hadoop.hive.contrib.mr.Output; +import org.apache.hadoop.hive.contrib.mr.Reducer; + +/** + * Example Reducer (WordCount). + */ +public final class WordCountReduce { + public static void main(final String[] args) throws Exception { + new GenericMR().reduce(System.in, System.out, new Reducer() { + public void reduce(String key, Iterator records, Output output) throws Exception { + int count = 0; + + while (records.hasNext()) { + // note we use col[1] -- the key is provided again as col[0] + count += Integer.parseInt(records.next()[1]); + } + + output.collect(new String[] { key, String.valueOf(count) }); + } + }); + } +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/example/IdentityMapper.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/mr/example/IdentityMapper.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/example/IdentityMapper.java (revision 0) @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.mr.example; + +import org.apache.hadoop.hive.contrib.mr.GenericMR; +import org.apache.hadoop.hive.contrib.mr.Mapper; +import org.apache.hadoop.hive.contrib.mr.Output; + +/** + * Example Mapper (Identity). + */ +public final class IdentityMapper { + public static void main(final String[] args) throws Exception { + new GenericMR().map(System.in, System.out, new Mapper() { + @Override + public void map(final String[] record, final Output output) throws Exception { + output.collect(record); + } + }); + } +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/GenericMR.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/mr/GenericMR.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/GenericMR.java (revision 0) @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.mr; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.Reader; +import java.io.Writer; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * This class attempts to provide a simple framework for writing Hive map/reduce + * tasks in java. + * + * The main benefit is that it deals with grouping the keys together for reduce + * tasks. + * + * Additionally, it deals with all system io... and provides something closer to + * the hadoop m/r. + * + * As an example, here's the wordcount reduce: + * + * new GenericMR().reduce(System.in, System.out, new Reducer() { + * public void reduce(String key, Iterator records, Output output) throws Exception { + * int count = 0; + * + * while (records.hasNext()) { + * count += Integer.parseInt(records.next()[1]); + * } + * + * output.collect(new String[] { key, String.valueOf(count) }); + * }}); + */ +public final class GenericMR { + public void map(final InputStream in, final OutputStream out, final Mapper mapper) throws Exception { + map(new InputStreamReader(in), new OutputStreamWriter(out), mapper); + } + + public void map(final Reader in, final Writer out, final Mapper mapper) throws Exception { + handle(in, out, new RecordProcessor() { + @Override + public void processNext(RecordReader reader, Output output) throws Exception { + mapper.map(reader.next(), output); + } + }); + } + + public void reduce(final InputStream in, final OutputStream out, final Reducer reducer) throws Exception { + reduce(new InputStreamReader(in), new OutputStreamWriter(out), reducer); + } + + public void reduce(final Reader in, final Writer out, final Reducer reducer) throws Exception { + handle(in, out, new RecordProcessor() { + @Override + public void processNext(RecordReader reader, Output output) throws Exception { + reducer.reduce(reader.peek()[0], new KeyRecordIterator(reader.peek()[0], reader), output); + } + }); + } + + private void handle(final Reader in, final Writer out, final RecordProcessor processor) throws Exception { + final RecordReader reader = new RecordReader(in); + final OutputStreamOutput output = new OutputStreamOutput(out); + + try { + while (reader.hasNext()) { + processor.processNext(reader, output); + } + } finally { + try { + output.close(); + } finally { + reader.close(); + } + } + } + + private static interface RecordProcessor { + void processNext(final RecordReader reader, final Output output) throws Exception; + } + + private static final class KeyRecordIterator implements Iterator { + private final String key; + private final RecordReader reader; + + private KeyRecordIterator(final String key, final RecordReader reader) { + this.key = key; + this.reader = reader; + } + + @Override + public boolean hasNext() { + return (this.reader.hasNext() && this.key.equals(this.reader.peek()[0])); + } + + @Override + public String[] next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + return this.reader.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + private static final class RecordReader { + private final BufferedReader reader; + private String[] next; + + private RecordReader(final InputStream in) { + this(new InputStreamReader(in)); + } + + private RecordReader(final Reader in) { + this.reader = new BufferedReader(in); + this.next = readNext(); + } + + private String[] next() { + final String[] ret = next; + + this.next = readNext(); + + return ret; + } + + private String[] readNext() { + try { + final String line = this.reader.readLine(); + return (line == null ? null : line.split("\t")); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private boolean hasNext() { + return next != null; + } + + private String[] peek() { + return next; + } + + private void close() throws Exception { + this.reader.close(); + } + } + + private static final class OutputStreamOutput implements Output { + private final PrintWriter out; + + private OutputStreamOutput(final OutputStream out) { + this(new OutputStreamWriter(out)); + } + + private OutputStreamOutput(final Writer out) { + this.out = new PrintWriter(out); + } + + public void close() throws Exception { + out.close(); + } + + @Override + public void collect(String[] record) throws Exception { + out.println(_join(record, "\t")); + } + + private static String _join(final String[] record, final String separator) { + if (record == null || record.length == 0) { + return ""; + } + final StringBuilder sb = new StringBuilder(); + for (int i=0; i< record.length; i++) { + if (i > 0) { + sb.append(separator); + } + sb.append(record[i]); + } + return sb.toString(); + } + } +}