HBase
  1. HBase
  2. HBASE-8202

MultiTableOutputFormat should support writing to another HBase cluster

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: mapreduce
    • Labels:
      None

      Description

      This was brought up by David Koch in thread 'hbase.mapred.output.quorum ignored in Mapper job with HDFS source and HBase sink' where he wanted to import a file on HDFS from one cluster A (source) into HBase
      tables on a different cluster B (destination) using a Mapper job with an
      HBase sink.

      Here is my analysis:

      MultiTableOutputFormat doesn't extend TableOutputFormat:

      public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Mutation> {
      

      Relevant configuration w.r.t. output quorum is setup in TableOutputFormat#setConf():

        public void setConf(Configuration otherConf) {
          this.conf = HBaseConfiguration.create(otherConf);
          String tableName = this.conf.get(OUTPUT_TABLE);
          if(tableName == null || tableName.length() <= 0) {
            throw new IllegalArgumentException("Must specify table name");
          }
          String address = this.conf.get(QUORUM_ADDRESS);
          int zkClientPort = conf.getInt(QUORUM_PORT, 0);
          String serverClass = this.conf.get(REGION_SERVER_CLASS);
          String serverImpl = this.conf.get(REGION_SERVER_IMPL);
          try {
            if (address != null) {
              ZKUtil.applyClusterKeyToConf(this.conf, address);
            }
      

        Activity

        Hide
        Nick Dimiduk added a comment -

        Thanks for opening the ticket.

        Show
        Nick Dimiduk added a comment - Thanks for opening the ticket.
        Hide
        David Koch added a comment -

        Hello,

        I asked the original question on the mailing list. Here is a minimalist example to illustrate the behavior. Run with $quorum != $output_quorum for maximum effect .

        HBase version was 0.92.1-cdh4.1.1.

        Example.java
        package org.hbase.example;
        
        import java.io.IOException;
        
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.conf.Configured;
        import org.apache.hadoop.hbase.KeyValue;
        import org.apache.hadoop.hbase.client.Put;
        import org.apache.hadoop.hbase.client.Result;
        import org.apache.hadoop.hbase.client.Scan;
        import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
        import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
        import org.apache.hadoop.hbase.mapreduce.TableMapper;
        import org.apache.hadoop.mapreduce.Job;
        import org.apache.hadoop.util.Tool;
        import org.apache.hadoop.util.ToolRunner;
        
        /**
         * Test to show how hbase.mapred.output.quorum setting is ignored with {@link MultiTableOutputFormat}.
         * 
         * @author davidkoch
         * 
         * See: https://issues.apache.org/jira/browse/HBASE-8202
         * 
         * Hadoop/HBase configurations are read from command line. Replace environment variables below.
         * 
         * 1. Test with {@link TableOutputFormat} (Ok):
         *    
         *      hadoop jar $jar_name org.hbase.example.Example \
         *      -D hbase.zookeeper.quorum=$quorum \
         *      -D hbase.zookeeper.property.clientPort=2181 \
         *      -D hbase.mapreduce.inputtable=$input_table \
         *      -D hbase.mapreduce.scan.column.family=$colfam \
         *      -D hbase.mapred.outputtable=$output_table \
         *      -D mapreduce.outputformat.class=org.apache.hadoop.hbase.mapreduce.TableOutputFormat \
         *      -D hbase.mapred.output.quorum=$output_quorum:2181:/hbase
         * 
         * 2. Test with {@link MultiTableOutputFormat} (Fails):
         * 
         *      hadoop jar $jar_name org.hbase.example.Example \
         *      -D hbase.zookeeper.quorum=$quorum \
         *      -D hbase.zookeeper.property.clientPort=2181 \
         *      -D hbase.mapreduce.inputtable=$input_table \
         *      -D hbase.mapreduce.scan.column.family=$colfam \
         *      -D hbase.mapred.outputtable=$output_table \
         *      -D mapreduce.outputformat.class=org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat \
         *      -D hbase.mapred.output.quorum=$output_quorum:2181:/hbase
         * 
         * In the second example, the job itself will not fail if $output_table exists on $quorum but $output_quorum will
         * be ignored.
         */
        public class Example extends Configured implements Tool {
        
            public static class ExampleMapper extends TableMapper<ImmutableBytesWritable, Put> {
                ImmutableBytesWritable tableName;
        
                @Override
                public void setup(Context context) {
                    tableName = new ImmutableBytesWritable(context.getConfiguration().get("hbase.mapred.outputtable")
                        .getBytes());
                }
                
                public void map(ImmutableBytesWritable row, Result value, Context context)
                    throws IOException, InterruptedException {
                    Put put = new Put(row.get());
                    for (KeyValue kv : value.raw()) {
                            put.add(kv);
                    }
                    context.write(tableName, put);
                }
            }
        
            public int run(String[] args) throws Exception {
                Configuration conf = getConf();
                
                Scan scan = new Scan();
                scan.addFamily(conf.get("hbase.mapreduce.scan.column.family").getBytes());
                String inTable =  conf.get("hbase.mapreduce.inputtable");
                
                Job job = new Job(conf);
                job.setJobName("Example-HBASE-8202");
                TableMapReduceUtil.initTableMapperJob(inTable, scan, ExampleMapper.class, null, null, job);
                job.setJarByClass(Example.class);
                job.setNumReduceTasks(0);
                
                return job.waitForCompletion(true) ? 0 : 1;
            }
        
            public static void main(String[] args) throws Exception {
                int res = ToolRunner.run(new Example(), args);
                System.exit(res);
            }
        }
        
        Show
        David Koch added a comment - Hello, I asked the original question on the mailing list. Here is a minimalist example to illustrate the behavior. Run with $quorum != $output_quorum for maximum effect . HBase version was 0.92.1-cdh4.1.1. Example.java package org.hbase.example; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * Test to show how hbase.mapred.output.quorum setting is ignored with {@link MultiTableOutputFormat}. * * @author davidkoch * * See: https: //issues.apache.org/jira/browse/HBASE-8202 * * Hadoop/HBase configurations are read from command line. Replace environment variables below. * * 1. Test with {@link TableOutputFormat} (Ok): * * hadoop jar $jar_name org.hbase.example.Example \ * -D hbase.zookeeper.quorum=$quorum \ * -D hbase.zookeeper.property.clientPort=2181 \ * -D hbase.mapreduce.inputtable=$input_table \ * -D hbase.mapreduce.scan.column.family=$colfam \ * -D hbase.mapred.outputtable=$output_table \ * -D mapreduce.outputformat.class=org.apache.hadoop.hbase.mapreduce.TableOutputFormat \ * -D hbase.mapred.output.quorum=$output_quorum:2181:/hbase * * 2. Test with {@link MultiTableOutputFormat} (Fails): * * hadoop jar $jar_name org.hbase.example.Example \ * -D hbase.zookeeper.quorum=$quorum \ * -D hbase.zookeeper.property.clientPort=2181 \ * -D hbase.mapreduce.inputtable=$input_table \ * -D hbase.mapreduce.scan.column.family=$colfam \ * -D hbase.mapred.outputtable=$output_table \ * -D mapreduce.outputformat.class=org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat \ * -D hbase.mapred.output.quorum=$output_quorum:2181:/hbase * * In the second example, the job itself will not fail if $output_table exists on $quorum but $output_quorum will * be ignored. */ public class Example extends Configured implements Tool { public static class ExampleMapper extends TableMapper<ImmutableBytesWritable, Put> { ImmutableBytesWritable tableName; @Override public void setup(Context context) { tableName = new ImmutableBytesWritable(context.getConfiguration().get( "hbase.mapred.outputtable" ) .getBytes()); } public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { Put put = new Put(row.get()); for (KeyValue kv : value.raw()) { put.add(kv); } context.write(tableName, put); } } public int run( String [] args) throws Exception { Configuration conf = getConf(); Scan scan = new Scan(); scan.addFamily(conf.get( "hbase.mapreduce.scan.column.family" ).getBytes()); String inTable = conf.get( "hbase.mapreduce.inputtable" ); Job job = new Job(conf); job.setJobName( "Example-HBASE-8202" ); TableMapReduceUtil.initTableMapperJob(inTable, scan, ExampleMapper.class, null , null , job); job.setJarByClass(Example.class); job.setNumReduceTasks(0); return job.waitForCompletion( true ) ? 0 : 1; } public static void main( String [] args) throws Exception { int res = ToolRunner.run( new Example(), args); System .exit(res); } }
        Hide
        Ted Yu added a comment -

        draft patch.

        Next step is to use David's sample code for verification.

        Show
        Ted Yu added a comment - draft patch. Next step is to use David's sample code for verification.
        Hide
        Ted Yu added a comment -

        David Koch:
        Any chance of trying the patch ?

        Show
        Ted Yu added a comment - David Koch : Any chance of trying the patch ?

          People

          • Assignee:
            Ted Yu
            Reporter:
            Ted Yu
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:

              Development