Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (revision 1505606) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (working copy) @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -332,6 +333,8 @@ job.setReducerClass(KeyValueSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); + } else if (Text.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(TextSortReducer.class); } else { LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (revision 1505606) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (working copy) @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; @@ -247,6 +248,31 @@ } private static final long serialVersionUID = 1L; } + + public int[] parseRowKey(byte[] lineBytes, int length) + throws BadTsvLineException { + int rkColumnIndex = 0; + int[] rowKeyOffsets = new int[] { 0, 0 }; + for (int i = 0; i <= length; i++) { + if (i == length || lineBytes[i] == separatorByte) { + rowKeyOffsets[1] = i - 1; + if (rkColumnIndex++ == getRowKeyColumnIndex()) { + if ((rowKeyOffsets[1] + 1) == rowKeyOffsets[0]) { + throw new BadTsvLineException("Empty value for ROW KEY."); + } + break; + } else { + rowKeyOffsets[0] = rowKeyOffsets[1] + 2; + } + } + if (i == length) { + throw new BadTsvLineException( + "Row key does not exist as number of columns in the line" + + " are less than row key position."); + } + } + return rowKeyOffsets; + } } /** @@ -301,10 +327,22 @@ Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(Put.class); - job.setCombinerClass(PutCombiner.class); + if (mapperClass.equals(TsvImporterTextMapper.class)) { + job.setMapOutputValueClass(Text.class); + job.setReducerClass(TextSortReducer.class); + } else { + job.setMapOutputValueClass(Put.class); + job.setCombinerClass(PutCombiner.class); + } HFileOutputFormat.configureIncrementalLoad(job, table); } else { + if (mapperClass.equals(TsvImporterTextMapper.class)) { + usage(TsvImporterTextMapper.class.toString() + + " should not be used for non bulkloading case. use " + + TsvImporterMapper.class.toString() + + " or custom mapper whose value type is Put."); + System.exit(-1); + } // No reducers. Just write straight to table. Call initTableReducerJob // to set up the TableOutputFormat. TableMapReduceUtil.initTableReducerJob(tableName, null, job); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java (working copy) @@ -0,0 +1,187 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.Iterator; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.StringUtils; + +/** + * Emits Sorted KeyValues. Reads the text passed, parses it and creates the Key Values then Sorts + * them and emits Keyalues in sorted order. If lots of columns per row, it will use lots of memory + * sorting. + * @see HFIleOutputFormat + * @see KeyValueSortReducer , PutSortReducer + */ +public class TextSortReducer extends + Reducer { + + /** Timestamp for all inserted rows */ + private long ts; + + /** Column seperator */ + private String separator; + + /** Should skip bad lines */ + private boolean skipBadLines; + + private Counter badLineCount; + + private ImportTsv.TsvParser parser; + + public long getTs() { + return ts; + } + + public boolean getSkipBadLines() { + return skipBadLines; + } + + public Counter getBadLineCount() { + return badLineCount; + } + + public void incrementBadLineCount(int count) { + this.badLineCount.increment(count); + } + + /** + * Handles initializing this class with objects specific to it (i.e., the parser). + * Common initialization that might be leveraged by a subsclass is done in + * doSetup. Hence a subclass may choose to override this method + * and call doSetup as well before handling it's own custom params. + * + * @param context + */ + @Override + protected void setup(Context context) { + doSetup(context); + + Configuration conf = context.getConfiguration(); + + parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); + if (parser.getRowKeyColumnIndex() == -1) { + throw new RuntimeException("No row key column specified"); + } + } + + /** + * Handles common parameter initialization that a subclass might want to leverage. + * @param context + */ + protected void doSetup(Context context) { + Configuration conf = context.getConfiguration(); + + // If a custom separator has been used, + // decode it back from Base64 encoding. + separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); + if (separator == null) { + separator = ImportTsv.DEFAULT_SEPARATOR; + } else { + separator = new String(Base64.decode(separator)); + } + + // Should never get 0 as we are setting this to a valid value in job configuration. + ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); + + skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); + badLineCount = context.getCounter("ImportTsv", "Bad Lines"); + } + + @Override + protected void reduce( + ImmutableBytesWritable rowKey, + java.lang.Iterable lines, + Reducer.Context context) + throws java.io.IOException, InterruptedException + { + // although reduce() is called per-row, handle pathological case + long threshold = context.getConfiguration().getLong( + "reducer.row.threshold", 1L * (1<<30)); + Iterator iter = lines.iterator(); + while (iter.hasNext()) { + TreeSet map = new TreeSet(KeyValue.COMPARATOR); + long curSize = 0; + // stop at the end or the RAM threshold + while (iter.hasNext() && curSize < threshold) { + Text line = iter.next(); + byte[] lineBytes = line.getBytes(); + try { + ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength()); + // Retrieve timestamp if exists + ts = parsed.getTimestamp(ts); + + for (int i = 0; i < parsed.getColumnCount(); i++) { + if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()) { + continue; + } + KeyValue kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), + parsed.getRowKeyLength(), parser.getFamily(i), 0, + parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, ts, KeyValue.Type.Put, + lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i)); + map.add(kv); + curSize += kv.getLength(); + } + } catch (ImportTsv.TsvParser.BadTsvLineException badLine) { + if (skipBadLines) { + System.err.println("Bad line." + badLine.getMessage()); + incrementBadLineCount(1); + return; + } else { + throw new IOException(badLine); + } + } catch (IllegalArgumentException e) { + if (skipBadLines) { + System.err.println("Bad line." + e.getMessage()); + incrementBadLineCount(1); + return; + } else { + throw new IOException(e); + } + } + } + context.setStatus("Read " + map.size() + " entries of " + map.getClass() + + "(" + StringUtils.humanReadableInt(curSize) + ")"); + int index = 0; + for (KeyValue kv : map) { + context.write(rowKey, kv); + if (index > 0 && index % 100 == 0) + context.setStatus("Wrote " + index); + } + + // if we have more entries to process + if (iter.hasNext()) { + // force flush because we cannot guarantee intra-row sorted order + context.write(null, null); + } + } + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java (working copy) @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; + +/** + * Write table content out to files in hdfs. + */ +public class TsvImporterTextMapper +extends Mapper +{ + + /** Column seperator */ + private String separator; + + /** Should skip bad lines */ + private boolean skipBadLines; + private Counter badLineCount; + + private ImportTsv.TsvParser parser; + + public boolean getSkipBadLines() { + return skipBadLines; + } + + public Counter getBadLineCount() { + return badLineCount; + } + + public void incrementBadLineCount(int count) { + this.badLineCount.increment(count); + } + + /** + * Handles initializing this class with objects specific to it (i.e., the parser). + * Common initialization that might be leveraged by a subsclass is done in + * doSetup. Hence a subclass may choose to override this method + * and call doSetup as well before handling it's own custom params. + * + * @param context + */ + @Override + protected void setup(Context context) { + doSetup(context); + + Configuration conf = context.getConfiguration(); + + parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); + if (parser.getRowKeyColumnIndex() == -1) { + throw new RuntimeException("No row key column specified"); + } + } + + /** + * Handles common parameter initialization that a subclass might want to leverage. + * @param context + */ + protected void doSetup(Context context) { + Configuration conf = context.getConfiguration(); + + // If a custom separator has been used, + // decode it back from Base64 encoding. + separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); + if (separator == null) { + separator = ImportTsv.DEFAULT_SEPARATOR; + } else { + separator = new String(Base64.decode(separator)); + } + + skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); + badLineCount = context.getCounter("ImportTsv", "Bad Lines"); + } + + /** + * Convert a line of TSV text into an HBase table row. + */ + @Override + public void map(LongWritable offset, Text value, Context context) throws IOException { + try { + int[] rowKeyOffsets = parser.parseRowKey(value.getBytes(), value.getLength()); + ImmutableBytesWritable rowKey = new ImmutableBytesWritable( + value.getBytes(), rowKeyOffsets[0], rowKeyOffsets[1]); + context.write(rowKey, value); + } catch (ImportTsv.TsvParser.BadTsvLineException badLine) { + if (skipBadLines) { + System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage()); + incrementBadLineCount(1); + return; + } else { + throw new IOException(badLine); + } + } catch (IllegalArgumentException e) { + if (skipBadLines) { + System.err.println("Bad line at offset: " + offset.get() + ":\n" + e.getMessage()); + incrementBadLineCount(1); + return; + } else { + throw new IOException(e); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (revision 1505606) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (working copy) @@ -47,7 +47,10 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.junit.AfterClass; @@ -181,6 +184,49 @@ util.deleteTable(table); } + @Test + public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception { + String table = "test-" + UUID.randomUUID(); + HBaseTestingUtility htu1 = new HBaseTestingUtility(); + Path bulkOutputPath = new Path(htu1.getDataTestDir(table),"hfiles"); + // Prepare the arguments required for the test. + String[] args = + new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", + "-D" + ImportTsv.COLUMNS_CONF_KEY + + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table + }; + GenericOptionsParser opts = new GenericOptionsParser(htu1.getConfiguration(), args); + args = opts.getRemainingArgs(); + Job job = ImportTsv.createSubmittableJob(htu1.getConfiguration(), args); + assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class)); + assertTrue(job.getReducerClass().equals(TextSortReducer.class)); + assertTrue(job.getMapOutputValueClass().equals(Text.class)); + } + + @Test + public void testBulkOutputithTsvImporterTextMapper() throws Exception { + String table = "test-" + UUID.randomUUID(); + String FAMILY = "FAM"; + HBaseTestingUtility htu1 = new HBaseTestingUtility(); + Path bulkOutputPath = new Path(htu1.getDataTestDir(table),"hfiles"); + // Prepare the arguments required for the test. + String[] args = + new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", + "-D" + ImportTsv.COLUMNS_CONF_KEY + + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table + }; + String data = "KEY\u001bVALUE4\u001bVALUE8\n"; + doMROnTableTest(htu1, FAMILY, data, args, 4); + } + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args) throws Exception { return doMROnTableTest(util, family, data, args, 1); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java (revision 1505606) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java (working copy) @@ -186,4 +186,41 @@ byte[] line = Bytes.toBytes("rowkey\tval_a"); parser.parse(line, line.length); } + + @Test + public void testTsvParserParseRowKey() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t"); + assertEquals(0, parser.getRowKeyColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\tval_a\t1234"); + int[] rowKeyOffsets = parser.parseRowKey(line, line.length); + assertEquals(0,rowKeyOffsets[0]); + assertEquals(5,rowKeyOffsets[1]); + try{ + line = Bytes.toBytes("\t\tval_a\t1234"); + parser.parseRowKey(line, line.length); + fail("Should get BadTsvLineException on empty rowkey."); + } catch(BadTsvLineException b){ + + } + parser = new TsvParser("col_a,HBASE_ROW_KEY,HBASE_TS_KEY", "\t"); + assertEquals(1, parser.getRowKeyColumnIndex()); + line = Bytes.toBytes("val_a\trowkey\t1234"); + rowKeyOffsets = parser.parseRowKey(line, line.length); + assertEquals(6,rowKeyOffsets[0]); + assertEquals(11,rowKeyOffsets[1]); + try { + line = Bytes.toBytes("val_a"); + rowKeyOffsets = parser.parseRowKey(line, line.length); + fail("Should get BadTsvLineException when number of columns less than rowkey position."); + } catch (BadTsvLineException b) { + + } + parser = new TsvParser("col_a,HBASE_TS_KEY,HBASE_ROW_KEY", "\t"); + assertEquals(2, parser.getRowKeyColumnIndex()); + line = Bytes.toBytes("val_a\t1234\trowkey"); + rowKeyOffsets = parser.parseRowKey(line, line.length); + assertEquals(11,rowKeyOffsets[0]); + assertEquals(16,rowKeyOffsets[1]); + } + }