Index: hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultipleInputs.java =================================================================== --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultipleInputs.java (revision 0) +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultipleInputs.java (revision 0) @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.util.HashSet; +import java.util.Set; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.thrift.test.IntString; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hive.hcatalog.data.DefaultHCatRecord; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.junit.Before; +import org.junit.Test; + +public class TestHCatMultipleInputs extends HCatBaseTest { + + private boolean setUpComplete = false; + + private final String[] test_tables = {"testsequence", "testtext"}; + + + /** + * Create an input sequence file, a table stored as text + * with two partitions. + */ + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + if (setUpComplete) { + return; + } + int cur_index = 0; + + Path intStringSeq = new Path(TEST_DATA_DIR + "/data/"+test_tables[cur_index]+"/intString.seq"); + LOG.info("Creating sequence file: " + intStringSeq); + SequenceFile.Writer seqFileWriter = SequenceFile.createWriter( + intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq, + NullWritable.class, BytesWritable.class); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + TIOStreamTransport transport = new TIOStreamTransport(out); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + for (int i = 1; i <= 100; i++) { + out.reset(); + IntString intString = new IntString(cur_index, Integer.toString(i), i); + intString.write(protocol); + BytesWritable bytesWritable = new BytesWritable(out.toByteArray()); + seqFileWriter.append(NullWritable.get(), bytesWritable); + } + + seqFileWriter.close(); + + // Now let's load this file into a new Hive table. + Assert.assertEquals(0, driver.run("drop table if exists "+test_tables[cur_index]).getResponseCode()); + Assert.assertEquals(0, driver.run( + "create table "+ test_tables[cur_index] +" "+ + "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " + + "with serdeproperties ( " + + " 'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " + + " 'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " + + "stored as" + + " inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" + + " outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'") + .getResponseCode()); + Assert.assertEquals(0, driver.run("load data local inpath '" + intStringSeq.getParent() + + "' into table "+test_tables[cur_index]).getResponseCode()); + + cur_index = 1; + + Path testTxt1 = new Path(TEST_DATA_DIR + "/data/"+test_tables[cur_index]+"/test1.txt"); + Path testTxt2 = new Path(TEST_DATA_DIR + "/data/"+test_tables[cur_index]+"/test2.txt"); + LOG.info("Creating text file: " + testTxt1); + LOG.info("Creating text file: " + testTxt2); + + BufferedWriter bw1 = new BufferedWriter(new OutputStreamWriter(testTxt1.getFileSystem(hiveConf).create(testTxt1))); + BufferedWriter bw2 = new BufferedWriter(new OutputStreamWriter(testTxt2.getFileSystem(hiveConf).create(testTxt2))); + + for (int i = 1; i <= 100; i++) { + String row = ""; + if (i % 2 == 0) { + row = i+":"+1+"\n"; + bw1.append(row); + } else { + row = i+":"+2+"\n"; + bw2.append(row); + } + } + bw1.close(); + bw2.close(); + + + Assert.assertEquals(0, driver.run("drop table if exists "+test_tables[cur_index]).getResponseCode()); + + Assert.assertEquals(0, driver.run( + "create table "+ test_tables[cur_index] +"(index int, other int) partitioned by (mod int) row format delimited fields terminated by ':' stored as textfile ") + .getResponseCode()); + Assert.assertEquals(0, driver.run("load data local inpath '" + testTxt1 + + "' into table "+test_tables[cur_index] + " partition (mod='1')").getResponseCode()); + Assert.assertEquals(0, driver.run("load data local inpath '" + testTxt2 + + "' into table "+test_tables[cur_index] + " partition (mod='2')").getResponseCode()); + + + + + setUpComplete = true; + } + + public Set readFromPath() { + Set resultSet= new HashSet(); + try{ + FileSystem fs = FileSystem.get(new Configuration()); + FileStatus[] status = fs.listStatus(new Path(TEST_DATA_DIR, "output")); + for (int i=0;i expectedSet= new HashSet(); + expectedSet.add(0); + expectedSet.add(1); + expectedSet.add(2); + Assert.assertEquals(expectedSet, readFromPath()); + } + + private boolean runJob() throws Exception { + Configuration conf = new Configuration(); + + Job job = new Job(conf); + job.setJarByClass(this.getClass()); + + + job.setOutputFormatClass(TextOutputFormat.class); + + HCatMultipleInputs.addInput(job, test_tables[0], "default", null, SequenceMapper.class); + HCatMultipleInputs.addInput(job, test_tables[1], null, "mod=\"1\"", TextMapper1.class); + HCatMultipleInputs.addInput(job, test_tables[1], null, "mod=\"2\"", TextMapper2.class); + + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(DefaultHCatRecord.class); + + Path path = new Path(TEST_DATA_DIR, "output"); + if (path.getFileSystem(conf).exists(path)) { + path.getFileSystem(conf).delete(path, true); + } + + TextOutputFormat.setOutputPath(job, path); + + job.setReducerClass(MyReducer.class); + + + return job.waitForCompletion(true); + } + + public static class SequenceMapper extends Mapper { + @Override + public void map(NullWritable key, HCatRecord value, Context context) + throws IOException, InterruptedException { + + LOG.info("Record: "+value); + + context.write(new Text(value.get(0).toString()), value); + } + } + + public static class TextMapper1 extends Mapper { + @Override + public void map(WritableComparable key, HCatRecord value, Context context) + throws IOException, InterruptedException { + + HCatSplit split = (HCatSplit) context.getInputSplit(); + + LOG.info("Mapper 1"); + String mod = value.getString("mod", split.getTableSchema()); + LOG.info("Record: "+value); + context.write(new Text(mod), value); + } + } + + public static class TextMapper2 extends Mapper { + @Override + public void map(WritableComparable key, HCatRecord value, Context context) + throws IOException, InterruptedException { + + HCatSplit split = (HCatSplit) context.getInputSplit(); + + LOG.info("Mapper 2"); + String mod = value.getString("mod", split.getTableSchema()); + LOG.info("Record: "+value); + context.write(new Text(mod), value); + } + } + + public static class MyReducer extends Reducer { + + @Override + protected void reduce( Text key, + java.lang.Iterable values, + org.apache.hadoop.mapreduce.Reducer.Context context) + throws IOException, InterruptedException { + + context.write(NullWritable.get(), key); + } + } +} Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMultipleInputs.java =================================================================== --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMultipleInputs.java (revision 0) +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMultipleInputs.java (revision 0) @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class supports MapReduce jobs that use multiple HCatalog + * tables as input. Usage is similar to the MultipleInputs class, + * which allows a user to specify a different InputFormat for each input Path. + * + *

+ * Usage pattern for job submission: + * + *

+ * Configuration conf = new Configuration();
+ *
+ * Job job = new Job(conf);
+ * job.setJarByClass(this.getClass());
+ *
+ * job.setOutputFormatClass(TextOutputFormat.class);
+ *
+ * HCatMultipleInputs.addInput(job, test_table1, "default", null, SequenceMapper.class);
+ * HCatMultipleInputs.addInput(job, test_table2, null, "part='1'", TextMapper1.class);
+ * HCatMultipleInputs.addInput(job, test_table2, null, "part='2'", TextMapper2.class);
+ *
+ * job.setMapOutputKeyClass(Text.class);
+ * job.setMapOutputValueClass(DefaultHCatRecord.class);
+ *
+ * Path path = new Path(TEST_DATA_DIR, "output");
+ *
+ * TextOutputFormat.setOutputPath(job, path);
+ *
+ * job.setReducerClass(MyReducer.class);
+ *
+ * return job.waitForCompletion(true);
+ *
+ */
+
+public class HCatMultipleInputs {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(HCatMultipleInputs.class.getName());
+
+    public static final String CONF_TABLES    = "hcat.input.multi.tables";
+    public static final String CONF_INPUTINFO = "hcat.input.multi.inputinfo";
+
+    private static void addInput(Job job, InputInfo info) {
+
+        Configuration conf = job.getConfiguration();
+
+        String newTableInfo = info.toString();
+
+        String storedTables = conf.get(CONF_TABLES);
+
+        conf.set(CONF_TABLES,
+            storedTables == null ? newTableInfo : storedTables + ","
+               + newTableInfo);
+
+        job.setInputFormatClass(HCatDelegatingInputFormat.class);
+
+        job.setMapperClass(HCatDelegatingMapper.class);
+    }
+
+    /**
+     * @param job    The {@link Job} itself.
+     * @param table  Table name to
+     * @param dbName The database that the table belongs to. If null, the default database will be used.
+     * @param filter The partition filter that should be applied to the table, can be null.
+     * @param mapperClass The mapper that should be used for this input, not null.
+     *
+     */
+
+    @SuppressWarnings("rawtypes")
+    public static void addInput(Job job, String table, String dbName, String filter, Class mapperClass) {
+        addInput(job, new InputInfo(table, dbName, filter, mapperClass));
+    }
+
+    static void writeInputInfoToSplit(InputSplit split, InputInfo info) {
+        try {
+            writeInputInfoToSplit(InternalUtil.castToHCatSplit(split), info);
+        } catch (IOException e) {
+            LOGGER.error(e.getMessage());
+        }
+    }
+
+    static void writeInputInfoToSplit(HCatSplit split, InputInfo info) {
+        Properties props = split.getPartitionInfo().getInputStorageHandlerProperties();
+        props.setProperty(HCatMultipleInputs.CONF_INPUTINFO, info.toString());
+    }
+
+    static InputInfo readInputInfoFromSplit(InputSplit split) {
+        try {
+            return readInputInfoFromSplit(InternalUtil.castToHCatSplit(split));
+        } catch (IOException e) {
+            LOGGER.error(e.getMessage());
+        }
+        return null;
+    }
+
+    static InputInfo readInputInfoFromSplit(HCatSplit split) {
+
+        String inputInfoString = split.getPartitionInfo().getInputStorageHandlerProperties().getProperty(HCatMultipleInputs.CONF_INPUTINFO);
+
+        InputInfo input = InputInfo.fromString(inputInfoString);
+
+        return input;
+
+    }
+
+    protected static Set getTableInfoSet(Configuration conf) {
+        Set tableInfoSet = new HashSet();
+        String infos[] = conf.getStrings(CONF_TABLES);
+        for (String encodedInfo : infos) {
+            InputInfo info = InputInfo.fromString(encodedInfo);
+            tableInfoSet.add(info);
+        }
+        return tableInfoSet;
+    }
+
+    static class InputInfo {
+        public String table;
+        public String dbName;
+        public String filter;
+        public String mapperClass;
+
+        public InputInfo (String table, String dbName, String filter) {
+            this.table = table;
+            this.dbName = dbName;
+            this.filter = filter;
+            this.mapperClass = null;
+        }
+
+        @SuppressWarnings("rawtypes")
+        public InputInfo(String table, String dbName, String filter, Class mapperClass) {
+            this.table = table;
+            this.dbName = dbName;
+            this.filter = filter;
+            if (mapperClass != null) {
+                this.mapperClass = mapperClass.getName();
+            } else {
+                mapperClass = null;
+            }
+        }
+
+        private InputInfo(String table, String dbName, String filter, String mapperClass) {
+            this.table = table;
+            this.dbName = dbName;
+            this.filter = filter;
+            this.mapperClass = mapperClass;
+        }
+
+        @Override
+        public String toString() {
+            return table+";"+dbName+";"+filter+";"+mapperClass;
+        }
+
+        public static InputInfo fromString(String ser) {
+            String split[] = ser.split(";");
+            for (int i = 0; i < split.length; i++) {
+                if (split[i] != null && split[i].equals("null")) {
+                    split[i] = null;
+                }
+            }
+            return new InputInfo(split[0], split[1], split[2], split[3]);
+        }
+    }
+
+}
Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatDelegatingMapper.java
===================================================================
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatDelegatingMapper.java	(revision 0)
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatDelegatingMapper.java	(revision 0)
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hive.hcatalog.mapreduce.HCatMultipleInputs.InputInfo;
+
+class HCatDelegatingMapper extends Mapper {
+    private Mapper mapper;
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+
+        InputInfo input = HCatMultipleInputs.readInputInfoFromSplit(context.getInputSplit());
+
+        Class mapperClass;
+        try {
+            if (input.mapperClass != null) {
+
+                mapperClass = (Class) Class.forName(input.mapperClass);
+
+            } else {
+                throw new NullPointerException("Mapper not set for input: "+input.table);
+            }
+            mapper = (Mapper) ReflectionUtils.newInstance(mapperClass, context.getConfiguration());
+        } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void run(Context context)
+        throws IOException, InterruptedException {
+        setup(context);
+        mapper.run(context);
+        cleanup(context);
+    }
+}
Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatDelegatingInputFormat.java
===================================================================
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatDelegatingInputFormat.java	(revision 0)
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatDelegatingInputFormat.java	(revision 0)
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.mapreduce.HCatMultipleInputs.InputInfo;
+
+class HCatDelegatingInputFormat extends HCatBaseInputFormat {
+
+    private void setInput(Configuration conf, InputInfo info) throws IOException {
+        String table = info.table;
+        String dbName = info.dbName;
+        String filter = info.filter;
+        HCatInputFormat.setInput(conf, dbName, table).setFilter(filter);
+    }
+
+    @Override
+    public List getSplits(JobContext jobContext)
+        throws IOException, InterruptedException {
+        ArrayList result = new ArrayList();
+
+        Configuration ctxConf = jobContext.getConfiguration();
+
+        for (InputInfo input : HCatMultipleInputs.getTableInfoSet(ctxConf)) {
+            Configuration conf = new Configuration(ctxConf);
+
+            setInput(conf, input);
+
+            JobContext ctx = new JobContext(conf, jobContext.getJobID());
+            List splits = super.getSplits(ctx);
+            for (InputSplit split : splits) {
+                HCatMultipleInputs.writeInputInfoToSplit(split, input);
+            }
+            result.addAll(splits);
+
+        }
+        return result;
+    }
+
+    @Override
+    public RecordReader createRecordReader(
+            InputSplit split, TaskAttemptContext taskContext) throws IOException, InterruptedException {
+
+        InputInfo input = HCatMultipleInputs.readInputInfoFromSplit(split);
+
+        setInput(taskContext.getConfiguration(), input);
+
+        return super.createRecordReader(split, taskContext);
+    }
+
+}
+