Index: src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java (date 1299110957000) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java (revision ) @@ -1,22 +1,22 @@ /** - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Copyright 2009 The Apache Software Foundation +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; @@ -26,6 +26,11 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; @@ -36,10 +41,10 @@ import org.apache.commons.logging.LogFactory; /** - * Export an HBase table. - * Writes content to sequence files up in HDFS. Use {@link Import} to read it - * back in again. - */ +* Export an HBase table. +* Writes content to sequence files up in HDFS. Use {@link Import} to read it +* back in again. +*/ public class Export { private static final Log LOG = LogFactory.getLog(Export.class); final static String NAME = "export"; @@ -84,31 +89,59 @@ Job job = new Job(conf, NAME + "_" + tableName); job.setJobName(NAME + "_" + tableName); job.setJarByClass(Exporter.class); - // TODO: Allow passing filter and subset of rows/columns. + // Set optional scan parameters + Scan s = getConfiguredScanForJob(conf, args); + TableMapReduceUtil.initTableMapperJob(tableName, s, Exporter.class, null, + null, job); + // No reducers. Just write straight to output files. + job.setNumReduceTasks(0); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Result.class); + FileOutputFormat.setOutputPath(job, outputDir); + return job; + } + + private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException { Scan s = new Scan(); // Optional arguments. + // Set Scan Versions int versions = args.length > 2? Integer.parseInt(args[2]): 1; s.setMaxVersions(versions); + // Set Scan Range long startTime = args.length > 3? Long.parseLong(args[3]): 0L; long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE; s.setTimeRange(startTime, endTime); + // Set cache blocks s.setCacheBlocks(false); + // Set Scan Column Family if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) { s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY))); } + // Set RowFilter or Prefix Filter if applicable. + Filter exportFilter = getExportFilter(args); + if (exportFilter!= null) { + LOG.info("Setting Scan Filter for Export."); + s.setFilter(exportFilter); + } LOG.info("verisons=" + versions + ", starttime=" + startTime + ", endtime=" + endTime); - TableMapReduceUtil.initTableMapperJob(tableName, s, Exporter.class, null, - null, job); - // No reducers. Just write straight to output files. - job.setNumReduceTasks(0); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(Result.class); - FileOutputFormat.setOutputPath(job, outputDir); - return job; + return s; } + private static Filter getExportFilter(String[] args) { + Filter exportFilter = null; + String filterCriteria = (args.length > 5) ? args[5]: null; + if (filterCriteria == null) return null; + if (filterCriteria.startsWith("^")) { + String regexPattern = filterCriteria.substring(1, filterCriteria.length()); + exportFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regexPattern)); + } else { + exportFilter = new PrefixFilter(Bytes.toBytes(filterCriteria)); + } + return exportFilter; + } + /* * @param errorMsg Error message. Can be null. */ @@ -117,7 +150,7 @@ System.err.println("ERROR: " + errorMsg); } System.err.println("Usage: Export [-D ]* [ " + - "[ []]]\n"); + "[ []] [^[regex pattern] or [Prefix] to filter]]\n"); System.err.println(" Note: -D properties will be applied to the conf used. "); System.err.println(" For example: "); System.err.println(" -D mapred.output.compress=true"); @@ -144,4 +177,4 @@ Job job = createSubmittableJob(conf, otherArgs); System.exit(job.waitForCompletion(true)? 0 : 1); } -} +} \ No newline at end of file