diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatDelegatingInputFormat.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatDelegatingInputFormat.java new file mode 100644 index 0000000..72af85a --- /dev/null +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatDelegatingInputFormat.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.HCatRecord; + +class HCatDelegatingInputFormat extends HCatBaseInputFormat { + + private void setInput(Configuration conf, InputJobInfo info) throws IOException { + conf.set( + HCatConstants.HCAT_KEY_JOB_INFO, + HCatUtil.serialize(info)); + } + + @Override + public List getSplits(JobContext jobContext) + throws IOException, InterruptedException { + ArrayList result = new ArrayList(); + + Configuration ctxConf = jobContext.getConfiguration(); + ArrayList inputJobInfoList = (ArrayList) HCatUtil.deserialize(ctxConf.get(HCatMultipleInputs.HCAT_KEY_MULTI_INPUT_JOBS_INFO)); + + for (int index = 0; index < inputJobInfoList.size(); index ++) { + InputJobInfo info = inputJobInfoList.get(index); + Configuration conf = new Configuration(ctxConf); + setInput(conf, info); + JobContext ctx = ShimLoader.getHadoopShims().getHCatShim().createJobContext(conf, jobContext.getJobID()); + List splits = super.getSplits(ctx); + for (InputSplit split : splits) { + HCatMultipleInputs.writeInputJobInfoIndexToSplit(split, index); + } + result.addAll(splits); + } + return result; + } + + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext taskContext) throws IOException, InterruptedException { + + InputJobInfo info = HCatMultipleInputs.readInputJobInfoFromSplit(taskContext.getConfiguration + (), split); + setInput(taskContext.getConfiguration(), info); + + return super.createRecordReader(split, taskContext); + } + +} + diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatDelegatingMapper.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatDelegatingMapper.java new file mode 100644 index 0000000..d4bef16 --- /dev/null +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatDelegatingMapper.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.ReflectionUtils; + +class HCatDelegatingMapper extends Mapper { + private Mapper mapper; + + @Override + @SuppressWarnings("unchecked") + protected void setup(Context context) + throws IOException, InterruptedException { + + String mapperClassStr = HCatMultipleInputs.readInputJobMapperFromSplit(context.getConfiguration + (), context.getInputSplit()); + + Class mapperClass; + try { + if (mapperClassStr != null) { + mapperClass = (Class) Class.forName(mapperClassStr); + + } else { + throw new NullPointerException("Mapper not set for input: "); + } + mapper = (Mapper) ReflectionUtils.newInstance(mapperClass, context.getConfiguration()); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + } + + @Override + public void run(Context context) + throws IOException, InterruptedException { + setup(context); + mapper.run(context); + cleanup(context); + } +} diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMultipleInputs.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMultipleInputs.java new file mode 100644 index 0000000..aec72ca --- /dev/null +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMultipleInputs.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class supports MapReduce jobs that use multiple HCatalog + * tables as input. Usage is similar to the MultipleInputs class, + * which allows a user to specify a different InputFormat for each input Path. + *

+ *

+ * Usage pattern for job submission: + *

+ *

+ * Configuration conf = new Configuration();
+ *
+ * Job job = new Job(conf);
+ * job.setJarByClass(this.getClass());
+ *
+ * job.setOutputFormatClass(TextOutputFormat.class);
+ *
+ * HCatMultipleInputs.init(job);
+ * HCatMultipleInputs.addInput(test_table1, "default", null, SequenceMapper.class);
+ * HCatMultipleInputs.addInput(test_table2, null, "part='1'", TextMapper1.class);
+ * HCatMultipleInputs.addInput(test_table2, null, "part='2'", TextMapper2.class);
+ * HCatMultipleInputs.build();
+ *
+ * job.setMapOutputKeyClass(Text.class);
+ * job.setMapOutputValueClass(DefaultHCatRecord.class);
+ *
+ * Path path = new Path(TEST_DATA_DIR, "output");
+ *
+ * TextOutputFormat.setOutputPath(job, path);
+ *
+ * job.setReducerClass(MyReducer.class);
+ *
+ * return job.waitForCompletion(true);
+ */
+
+public class HCatMultipleInputs {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(HCatMultipleInputs.class.getName());
+
+  public static final String HCAT_KEY_MULTI_INPUT_JOBS_INFO = HCatConstants.HCAT_KEY_BASE + "" +
+          ".multi.jobs.info";
+  public static final String HCAT_KEY_MULTI_INPUT_JOBS_INFO_MAPPER = HCatConstants.HCAT_KEY_BASE +
+          ".multi.jobs.info.mapper";
+
+  public static final String HCAT_KEY_MULTI_INPUT_JOB_INFO_INDEX = HCatConstants.HCAT_KEY_BASE +
+          ".multi.job.info.index";
+
+
+  private static Job job;
+  private static final ArrayList inputJobInfoList = new ArrayList();
+  private static final ArrayList inputJobMapperList = new ArrayList();
+
+
+
+  public static void init(Job _job) {
+    job = _job;
+    inputJobInfoList.clear();
+    inputJobMapperList.clear();
+  }
+
+  public static void build() throws IOException {
+    Configuration conf = job.getConfiguration();
+    conf.set(HCAT_KEY_MULTI_INPUT_JOBS_INFO, HCatUtil.serialize(inputJobInfoList));
+    conf.set(HCAT_KEY_MULTI_INPUT_JOBS_INFO_MAPPER, HCatUtil.serialize(inputJobMapperList));
+    job.setInputFormatClass(HCatDelegatingInputFormat.class);
+    job.setMapperClass(HCatDelegatingMapper.class);
+
+  }
+
+
+  /**
+   * @param table       Table name to
+   * @param dbName      The database that the table belongs to. If null, the default database will be used.
+   * @param filter      The partition filter that should be applied to the table, can be null.
+   * @param mapperClass The mapper that should be used for this input, not null.
+   */
+
+  @SuppressWarnings("rawtypes")
+  public static void addInput(String table, String dbName, String filter, Class mapperClass) throws Exception {
+    String mapperClassStr = null;
+    if (mapperClass != null) {
+      mapperClassStr = mapperClass.getName();
+    }
+    addInput(table, dbName, filter, mapperClassStr);
+  }
+
+  @SuppressWarnings("rawtypes")
+  public static void addInput(String table, String dbName, String filter,
+                              String mapperClass) throws Exception {
+    Configuration conf = job.getConfiguration();
+    InputJobInfo info = InputJobInfo.create(dbName, table, filter, null);
+    info = InitializeInput.getInputJobInfoForMultiInputs(conf, info, null);
+    inputJobInfoList.add(info);
+    inputJobMapperList.add(mapperClass);
+  }
+
+  static void writeInputJobInfoIndexToSplit(InputSplit split, int index) {
+    try {
+      writeInputJobInfoIndexToSplit(InternalUtil.castToHCatSplit(split), index);
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage());
+    }
+  }
+
+  static void writeInputJobInfoIndexToSplit(HCatSplit split, int index) {
+    Properties props = split.getPartitionInfo().getInputStorageHandlerProperties();
+    props.setProperty(HCatMultipleInputs.HCAT_KEY_MULTI_INPUT_JOB_INFO_INDEX, String.valueOf(index));
+  }
+
+
+  static InputJobInfo readInputJobInfoFromSplit(Configuration conf, InputSplit split) throws IOException {
+    int index = readInputJobInfoIndexFromSplit(split);
+    ArrayList _inputJobInfoList = (ArrayList) HCatUtil.deserialize
+            (conf.get(HCatMultipleInputs.HCAT_KEY_MULTI_INPUT_JOBS_INFO));
+    return _inputJobInfoList.get(index);
+  }
+
+
+  static String readInputJobMapperFromSplit(Configuration conf, InputSplit split) throws IOException {
+    int index = readInputJobInfoIndexFromSplit(split);
+    ArrayList _inputJobMapperList = (ArrayList) HCatUtil.deserialize
+            (conf.get(HCAT_KEY_MULTI_INPUT_JOBS_INFO_MAPPER));
+    return _inputJobMapperList.get(index);
+  }
+
+  static private int readInputJobInfoIndexFromSplit(InputSplit split) {
+    try {
+      return readInputJobInfoIndexFromSplit(InternalUtil.castToHCatSplit(split));
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage());
+    }
+    return -1;
+  }
+
+  static private int readInputJobInfoIndexFromSplit(HCatSplit split) {
+
+    return Integer.valueOf(split.getPartitionInfo().getInputStorageHandlerProperties().getProperty
+            (HCatMultipleInputs.HCAT_KEY_MULTI_INPUT_JOB_INFO_INDEX));
+  }
+}
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java
index 1980ef5..306dfca 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java
@@ -88,6 +88,18 @@ public static void setInput(Configuration conf,
       HCatUtil.serialize(getInputJobInfo(conf, inputJobInfo, null)));
   }
 
+
+  /**
+   * Returns the given InputJobInfo for MultiInput after populating with data queried from the
+   * metadata
+   * service.
+   */
+  public static InputJobInfo getInputJobInfoForMultiInputs(Configuration conf,
+                                                           InputJobInfo inputJobInfo,
+                                                           String locationFilter) throws Exception {
+      return getInputJobInfo(conf, inputJobInfo, locationFilter);
+  }
+
   /**
    * Returns the given InputJobInfo after populating with data queried from the metadata service.
    */
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
index 3aa7bd8..e8e04cb 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
@@ -21,7 +21,10 @@
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hive.hcatalog.common.HCatUtil;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -178,11 +181,15 @@ public Properties getProperties() {
   private void writeObject(ObjectOutputStream oos)
     throws IOException {
     oos.defaultWriteObject();
+
     Deflater def = new Deflater(Deflater.BEST_COMPRESSION);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
     ObjectOutputStream partInfoWriter =
-      new ObjectOutputStream(new DeflaterOutputStream(oos, def));
+            new ObjectOutputStream(new DeflaterOutputStream(bout, def));
     partInfoWriter.writeObject(partitions);
     partInfoWriter.close();
+    String partInfoEncodeStr = HCatUtil.encodeBytes(bout.toByteArray());
+    oos.writeObject(partInfoEncodeStr);
   }
 
   /**
@@ -194,8 +201,10 @@ private void writeObject(ObjectOutputStream oos)
   private void readObject(ObjectInputStream ois)
     throws IOException, ClassNotFoundException {
     ois.defaultReadObject();
+    String partInfoEncodeStr = (String)ois.readObject();
+    ByteArrayInputStream bin = new ByteArrayInputStream(HCatUtil.decodeBytes(partInfoEncodeStr));
     ObjectInputStream partInfoReader =
-      new ObjectInputStream(new InflaterInputStream(ois));
+      new ObjectInputStream(new InflaterInputStream(bin));
     partitions = (List)partInfoReader.readObject();
   }
 }
diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultipleInputs.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultipleInputs.java
new file mode 100644
index 0000000..458fb8c
--- /dev/null
+++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultipleInputs.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.thrift.test.IntString;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHCatMultipleInputs extends HCatBaseTest {
+
+  private boolean setUpComplete = false;
+
+  private final String[] test_tables = {"testsequence", "testtext"};
+
+
+  /**
+   * Create an input sequence file, a table stored as text
+   * with two partitions.
+   */
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    if (setUpComplete) {
+      return;
+    }
+    int cur_index = 0;
+
+    Path intStringSeq = new Path(TEST_DATA_DIR + "/data/"+test_tables[cur_index]+"/intString.seq");
+    LOG.info("Creating sequence file: " + intStringSeq);
+    SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(
+      intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq,
+      NullWritable.class, BytesWritable.class);
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    TIOStreamTransport transport = new TIOStreamTransport(out);
+    TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+    for (int i = 1; i <= 100; i++) {
+      out.reset();
+      IntString intString = new IntString(cur_index, Integer.toString(i), i);
+      intString.write(protocol);
+      BytesWritable bytesWritable = new BytesWritable(out.toByteArray());
+      seqFileWriter.append(NullWritable.get(), bytesWritable);
+    }
+
+    seqFileWriter.close();
+
+    // Now let's load this file into a new Hive table.
+    Assert.assertEquals(0, driver.run("drop table if exists "+test_tables[cur_index]).getResponseCode());
+    Assert.assertEquals(0, driver.run(
+      "create table "+ test_tables[cur_index] +" "+
+        "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " +
+        "with serdeproperties ( " +
+        "  'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " +
+        "  'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " +
+        "stored as" +
+        "  inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" +
+        "  outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'")
+      .getResponseCode());
+    Assert.assertEquals(0, driver.run("load data local inpath '" + intStringSeq.getParent() +
+      "' into table "+test_tables[cur_index]).getResponseCode());
+
+    cur_index = 1;
+
+    Path testTxt1 = new Path(TEST_DATA_DIR + "/data/"+test_tables[cur_index]+"/test1.txt");
+    Path testTxt2 = new Path(TEST_DATA_DIR + "/data/"+test_tables[cur_index]+"/test2.txt");
+    LOG.info("Creating text file: " + testTxt1);
+    LOG.info("Creating text file: " + testTxt2);
+
+    BufferedWriter bw1 = new BufferedWriter(new OutputStreamWriter(testTxt1.getFileSystem(hiveConf).create(testTxt1)));
+    BufferedWriter bw2 = new BufferedWriter(new OutputStreamWriter(testTxt2.getFileSystem(hiveConf).create(testTxt2)));
+
+    for (int i = 1; i <= 100; i++) {
+      String row = "";
+      if (i % 2 == 0) {
+        row = i+":"+1+"\n";
+        bw1.append(row);
+      } else {
+        row = i+":"+2+"\n";
+        bw2.append(row);
+      }
+    }
+    bw1.close();
+    bw2.close();
+
+
+    Assert.assertEquals(0, driver.run("drop table if exists "+test_tables[cur_index]).getResponseCode());
+
+    Assert.assertEquals(0, driver.run(
+      "create table "+ test_tables[cur_index] +"(index int, other int) partitioned by (mod " +
+              "string) row" +
+              " format delimited fields terminated by ':' stored as textfile ")
+      .getResponseCode());
+    Assert.assertEquals(0, driver.run("load data local inpath '" + testTxt1 +
+      "' into table "+test_tables[cur_index] + " partition (mod='1')").getResponseCode());
+    Assert.assertEquals(0, driver.run("load data local inpath '" + testTxt2 +
+      "' into table "+test_tables[cur_index] + " partition (mod='2')").getResponseCode());
+
+
+
+
+    setUpComplete = true;
+  }
+
+  public Set readFromPath() {
+    Set resultSet= new HashSet();
+    try{
+      FileSystem fs = FileSystem.get(new Configuration());
+      FileStatus[] status = fs.listStatus(new Path(TEST_DATA_DIR, "output"));
+      for (int i=0;i expectedSet= new HashSet();
+    expectedSet.add(0);
+    expectedSet.add(1);
+    expectedSet.add(2);
+    Assert.assertEquals(expectedSet, readFromPath());
+  }
+
+  private boolean runJob() throws Exception {
+    Configuration conf = new Configuration();
+
+    Job job = new Job(conf);
+    job.setJarByClass(this.getClass());
+
+
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    HCatMultipleInputs.init(job);
+    HCatMultipleInputs.addInput(test_tables[0], "default", null, SequenceMapper.class);
+    HCatMultipleInputs.addInput(test_tables[1], null, "mod=\"1\"", TextMapper1.class);
+    HCatMultipleInputs.addInput(test_tables[1], null, "mod=\"2\"", TextMapper2.class);
+    HCatMultipleInputs.build();
+
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(DefaultHCatRecord.class);
+
+    Path path = new Path(TEST_DATA_DIR, "output");
+    if (path.getFileSystem(conf).exists(path)) {
+      path.getFileSystem(conf).delete(path, true);
+    }
+
+    TextOutputFormat.setOutputPath(job, path);
+
+    job.setReducerClass(MyReducer.class);
+
+
+    return job.waitForCompletion(true);
+  }
+
+  public static class SequenceMapper extends Mapper {
+    @Override
+    public void map(NullWritable key, HCatRecord value, Context context)
+      throws IOException, InterruptedException {
+
+      LOG.info("Record: "+value);
+
+      context.write(new Text(value.get(0).toString()), value);
+    }
+  }
+
+  public static class TextMapper1 extends Mapper {
+    @Override
+    public void map(WritableComparable key, HCatRecord value, Context context)
+      throws IOException, InterruptedException {
+
+      HCatSplit split = (HCatSplit) context.getInputSplit();
+
+      LOG.info("Mapper 1");
+      String mod = value.getString("mod", split.getTableSchema());
+      LOG.info("Record: "+value);
+      context.write(new Text(mod), value);
+    }
+  }
+
+  public static class TextMapper2 extends Mapper {
+    @Override
+    public void map(WritableComparable key, HCatRecord value, Context context)
+      throws IOException, InterruptedException {
+
+      HCatSplit split = (HCatSplit) context.getInputSplit();
+
+      LOG.info("Mapper 2");
+      String mod = value.getString("mod", split.getTableSchema());
+      LOG.info("Record: "+value);
+      context.write(new Text(mod), value);
+    }
+  }
+
+  public static class MyReducer extends Reducer {
+
+    @Override
+    protected void reduce( Text key,
+      java.lang.Iterable values,
+      org.apache.hadoop.mapreduce.Reducer.Context context)
+      throws IOException, InterruptedException {
+
+      context.write(NullWritable.get(), key);
+    }
+  }
+}