diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index de91829..1e1c620 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -82,8 +82,10 @@ public class ImportTsv extends Configured implements Tool { public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines"; public final static String COLUMNS_CONF_KEY = "importtsv.columns"; public final static String SEPARATOR_CONF_KEY = "importtsv.separator"; - + public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator"; final static String DEFAULT_SEPARATOR = "\t"; + final static String DEFAULT_ATTRIBUTES_SEPERATOR = "=>"; + final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ","; final static Class DEFAULT_MAPPER = TsvImporterMapper.class; public static class TsvParser { @@ -108,11 +110,20 @@ public class ImportTsv extends Configured implements Tool { public static final String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY"; + public static final String ATTRIBUTES_COLUMN_SPEC = "HBASE_ATTRIBUTES_KEY"; + + private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX; + + public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1; /** * @param columnsSpecification the list of columns to parser out, comma separated. * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC + * @param tagSeperatorStr */ - public TsvParser(String columnsSpecification, String separatorStr) { + public TsvParser(String columnsSpecification, String seperatorStr) { + this(columnsSpecification, seperatorStr, null); + } + public TsvParser(String columnsSpecification, String separatorStr, String tagSeperatorStr) { // Configure separator byte[] separator = Bytes.toBytes(separatorStr); Preconditions.checkArgument(separator.length == 1, @@ -133,12 +144,14 @@ public class ImportTsv extends Configured implements Tool { rowKeyColumnIndex = i; continue; } - if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) { timestampKeyColumnIndex = i; continue; } - + if(ATTRIBUTES_COLUMN_SPEC.equals(str)) { + attrKeyColumnIndex = i; + continue; + } String[] parts = str.split(":", 2); if (parts.length == 1) { families[i] = str.getBytes(); @@ -158,6 +171,13 @@ public class ImportTsv extends Configured implements Tool { return timestampKeyColumnIndex; } + public boolean hasAttributes() { + return attrKeyColumnIndex != DEFAULT_ATTRIBUTES_COLUMN_INDEX; + } + + public int getAttributesKeyColumnIndex() { + return attrKeyColumnIndex; + } public int getRowKeyColumnIndex() { return rowKeyColumnIndex; } @@ -190,6 +210,8 @@ public class ImportTsv extends Configured implements Tool { } else if (hasTimestamp() && tabOffsets.size() <= getTimestampKeyColumnIndex()) { throw new BadTsvLineException("No timestamp"); + } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) { + throw new BadTsvLineException("No attributes specified"); } return new ParsedLine(tabOffsets, lineBytes); } @@ -226,6 +248,41 @@ public class ImportTsv extends Configured implements Tool { throw new BadTsvLineException("Invalid timestamp " + timeStampStr); } } + + private String getAttributes() { + if (!hasAttributes()) { + return null; + } else { + return Bytes.toString(lineBytes, getColumnOffset(attrKeyColumnIndex), + getColumnLength(attrKeyColumnIndex)); + } + } + + public String[] getIndividualAttributes() { + String attributes = getAttributes(); + if (attributes != null) { + return attributes.split(DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR); + } else { + return null; + } + } + + public int getAttributeKeyOffset() { + if (hasAttributes()) { + return getColumnOffset(attrKeyColumnIndex); + } else { + return DEFAULT_ATTRIBUTES_COLUMN_INDEX; + } + } + + public int getAttributeKeyLength() { + if (hasAttributes()) { + return getColumnLength(attrKeyColumnIndex); + } else { + return DEFAULT_ATTRIBUTES_COLUMN_INDEX; + } + } + public int getColumnOffset(int idx) { if (idx > 0) @@ -398,6 +455,9 @@ public class ImportTsv extends Configured implements Tool { "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" + "Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" + "\n" + + TsvParser.ATTRIBUTES_COLUMN_SPEC+" can be used to specify Operation Attributes per record.\n"+ + " Should be specified as key=>value where "+TsvParser.DEFAULT_ATTRIBUTES_COLUMN_INDEX+ " is used \n"+ + " as the seperator. Note that more than one OperationAttributes can be specified.\n"+ "By default importtsv will load data directly into HBase. To instead generate\n" + "HFiles of data to prepare for a bulk data load, pass the option:\n" + " -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" + @@ -460,10 +520,21 @@ public class ImportTsv extends Configured implements Tool { + TsvParser.TIMESTAMPKEY_COLUMN_SPEC); return -1; } + + int attrKeysFound = 0; + for (String col : columns) { + if (col.equals(TsvParser.ATTRIBUTES_COLUMN_SPEC)) + attrKeysFound++; + } + if (attrKeysFound > 1) { + usage("Must specify at most one column as " + + TsvParser.ATTRIBUTES_COLUMN_SPEC); + return -1; + } // Make sure one or more columns are specified excluding rowkey and // timestamp key - if (columns.length - (rowkeysFound + tskeysFound) < 1) { + if (columns.length - (rowkeysFound + tskeysFound + attrKeysFound) < 1) { usage("One or more columns in addition to the row key and timestamp(optional) are required"); return -1; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java index 6360b2e..9e14212 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java @@ -17,19 +17,20 @@ */ package org.apache.hadoop.hbase.mapreduce; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Base64; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Counter; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Mapper; /** * Write table content out to files in hdfs. @@ -41,7 +42,7 @@ extends Mapper { /** Timestamp for all inserted rows */ - private long ts; + protected long ts; /** Column seperator */ private String separator; @@ -50,7 +51,9 @@ extends Mapper private boolean skipBadLines; private Counter badLineCount; - private ImportTsv.TsvParser parser; + protected ImportTsv.TsvParser parser; + + protected Configuration conf; public long getTs() { return ts; @@ -80,8 +83,7 @@ extends Mapper protected void setup(Context context) { doSetup(context); - Configuration conf = context.getConfiguration(); - + conf = context.getConfiguration(); parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); if (parser.getRowKeyColumnIndex() == -1) { @@ -104,7 +106,6 @@ extends Mapper } else { separator = new String(Base64.decode(separator)); } - // Should never get 0 as we are setting this to a valid value in job // configuration. ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); @@ -135,18 +136,11 @@ extends Mapper Put put = new Put(rowKey.copyBytes()); for (int i = 0; i < parsed.getColumnCount(); i++) { - if (i == parser.getRowKeyColumnIndex() - || i == parser.getTimestampKeyColumnIndex()) { + if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() + || i == parser.getAttributesKeyColumnIndex()) { continue; } - KeyValue kv = new KeyValue( - lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), - parser.getFamily(i), 0, parser.getFamily(i).length, - parser.getQualifier(i), 0, parser.getQualifier(i).length, - ts, - KeyValue.Type.Put, - lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i)); - put.add(kv); + KeyValue kv = createPuts(lineBytes, parsed, put, i); } context.write(rowKey, put); } catch (ImportTsv.TsvParser.BadTsvLineException badLine) { @@ -173,4 +167,15 @@ extends Mapper e.printStackTrace(); } } + + protected KeyValue createPuts(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put, + int i) throws BadTsvLineException, IOException { + KeyValue kv; + kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), + parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, parsed.getColumnOffset(i), + parsed.getColumnLength(i)); + put.add(kv); + return kv; + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java new file mode 100644 index 0000000..3ae585b --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestImportTSVWithOperationAttributes implements Configurable { + + protected static final Log LOG = LogFactory.getLog(TestImportTSVWithOperationAttributes.class); + protected static final String NAME = TestImportTsv.class.getSimpleName(); + protected static HBaseTestingUtility util = new HBaseTestingUtility(); + + /** + * Delete the tmp directory after running doMROnTableTest. Boolean. Default is + * false. + */ + protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; + + /** + * Force use of combiner in doMROnTableTest. Boolean. Default is true. + */ + protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; + + private static Configuration conf; + + private static final String TEST_ATR_KEY = "test"; + + private final String FAMILY = "FAM"; + + public Configuration getConf() { + return util.getConfiguration(); + } + + public void setConf(Configuration conf) { + throw new IllegalArgumentException("setConf not supported"); + } + + @BeforeClass + public static void provisionCluster() throws Exception { + conf = util.getConfiguration(); + conf.set("hbase.coprocessor.master.classes", OperationAttributesTestController.class.getName()); + conf.set("hbase.coprocessor.region.classes", OperationAttributesTestController.class.getName()); + util.startMiniCluster(); + HBaseAdmin admin = new HBaseAdmin(util.getConfiguration()); + util.startMiniMapReduceCluster(); + } + + @AfterClass + public static void releaseCluster() throws Exception { + util.shutdownMiniMapReduceCluster(); + util.shutdownMiniCluster(); + } + + @Test + public void testMROnTable() throws Exception { + String tableName = "test-" + UUID.randomUUID(); + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr", + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName }; + String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest=>myvalue\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1, true); + util.deleteTable(tableName); + } + + @Test + public void testMROnTableWithInvalidOperationAttr() throws Exception { + String tableName = "test-" + UUID.randomUUID(); + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr", + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName }; + String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest1=>myvalue\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1, false); + util.deleteTable(tableName); + } + + /** + * Run an ImportTsv job and perform basic validation on the results. Returns + * the ImportTsv Tool instance so that other tests can inspect it + * for further validation as necessary. This method is static to insure + * non-reliance on instance's util/conf facilities. + * + * @param args + * Any arguments to pass BEFORE inputFile path is appended. + * @param dataAvailable + * @return The Tool instance used to run the test. + */ + private Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args, + int valueMultiplier, boolean dataAvailable) throws Exception { + String table = args[args.length - 1]; + Configuration conf = new Configuration(util.getConfiguration()); + + // populate input file + FileSystem fs = FileSystem.get(conf); + Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat")); + FSDataOutputStream op = fs.create(inputPath, true); + op.write(Bytes.toBytes(data)); + op.close(); + LOG.debug(String.format("Wrote test data to file: %s", inputPath)); + + if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { + LOG.debug("Forcing combiner."); + conf.setInt("min.num.spills.for.combine", 1); + } + + // run the import + List argv = new ArrayList(Arrays.asList(args)); + argv.add(inputPath.toString()); + Tool tool = new ImportTsv(); + LOG.debug("Running ImportTsv with arguments: " + argv); + assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); + + validateTable(conf, table, family, valueMultiplier, dataAvailable); + + if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { + LOG.debug("Deleting test subdirectory"); + util.cleanupDataTestDirOnTestFS(table); + } + return tool; + } + + /** + * Confirm ImportTsv via data in online table. + * + * @param dataAvailable + */ + private static void validateTable(Configuration conf, String tableName, String family, + int valueMultiplier, boolean dataAvailable) throws IOException { + + LOG.debug("Validating table."); + HTable table = new HTable(conf, tableName); + boolean verified = false; + long pause = conf.getLong("hbase.client.pause", 5 * 1000); + int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); + for (int i = 0; i < numRetries; i++) { + try { + Scan scan = new Scan(); + // Scan entire family. + scan.addFamily(Bytes.toBytes(family)); + if (dataAvailable) { + ResultScanner resScanner = table.getScanner(scan); + for (Result res : resScanner) { + LOG.debug("Getting results " + res.size()); + assertTrue(res.size() == 2); + List kvs = res.listCells(); + assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY"))); + assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY"))); + assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); + assertTrue(CellUtil.matchingValue(kvs.get(1), + Bytes.toBytes("VALUE" + 2 * valueMultiplier))); + // Only one result set is expected, so let it loop. + verified = true; + } + } else { + ResultScanner resScanner = table.getScanner(scan); + Result[] next = resScanner.next(2); + assertEquals(0, next.length); + verified = true; + } + + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + table.close(); + assertTrue(verified); + } + + public static class OperationAttributesTestController extends BaseRegionObserver { + + @Override + public void prePut(ObserverContext e, Put put, WALEdit edit, + Durability durability) throws IOException { + HRegion region = e.getEnvironment().getRegion(); + if (!region.getRegionInfo().isMetaTable() + && !region.getRegionInfo().getTable().isSystemTable()) { + if (put.getAttribute(TEST_ATR_KEY) != null) { + LOG.debug("allow any put to happen " + region.getRegionNameAsString()); + } else { + e.bypass(); + } + } + } + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java index edc927b..474c388 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java @@ -54,7 +54,7 @@ public class TestImportTsvParser { ArrayList parsedCols = new ArrayList(); for (int i = 0; i < parsed.getColumnCount(); i++) { parsedCols.add(Bytes.toString(parsed.getLineBytes(), parsed.getColumnOffset(i), - parsed.getColumnLength(i))); + parsed.getColumnLength(i))); } if (!Iterables.elementsEqual(parsedCols, expected)) { fail("Expected: " + Joiner.on(",").join(expected) + "\n" + "Got:" @@ -100,6 +100,32 @@ public class TestImportTsvParser { assertEquals(0, parser.getRowKeyColumnIndex()); assertTrue(parser.hasTimestamp()); assertEquals(2, parser.getTimestampKeyColumnIndex()); + + parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2,HBASE_ATTRIBUTES_KEY", + "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3)); + assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertTrue(parser.hasTimestamp()); + assertEquals(2, parser.getTimestampKeyColumnIndex()); + assertEquals(4, parser.getAttributesKeyColumnIndex()); + + parser = new TsvParser("HBASE_ATTRIBUTES_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2,HBASE_ROW_KEY", + "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3)); + assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3)); + assertEquals(4, parser.getRowKeyColumnIndex()); + assertTrue(parser.hasTimestamp()); + assertEquals(2, parser.getTimestampKeyColumnIndex()); + assertEquals(0, parser.getAttributesKeyColumnIndex()); } @Test @@ -113,8 +139,7 @@ public class TestImportTsvParser { assertNull(parser.getQualifier(2)); assertEquals(2, parser.getRowKeyColumnIndex()); - assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, - parser.getTimestampKeyColumnIndex()); + assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser.getTimestampKeyColumnIndex()); byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d"); ParsedLine parsed = parser.parse(line, line.length); @@ -187,14 +212,13 @@ public class TestImportTsvParser { byte[] line = Bytes.toBytes("rowkey\tval_a"); parser.parse(line, line.length); } - + @Test public void testTsvParserParseRowKey() throws BadTsvLineException { TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t"); assertEquals(0, parser.getRowKeyColumnIndex()); byte[] line = Bytes.toBytes("rowkey\tval_a\t1234"); - Pair rowKeyOffsets = parser - .parseRowKey(line, line.length); + Pair rowKeyOffsets = parser.parseRowKey(line, line.length); assertEquals(0, rowKeyOffsets.getFirst().intValue()); assertEquals(5, rowKeyOffsets.getSecond().intValue()); try { @@ -225,4 +249,50 @@ public class TestImportTsvParser { assertEquals(16, rowKeyOffsets.getSecond().intValue()); } + @Test + public void testTsvParseAttributesKey() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY,HBASE_ATTRIBUTES_KEY", "\t"); + assertEquals(0, parser.getRowKeyColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\tval_a\t1234\tkey=>value"); + ParsedLine parse = parser.parse(line, line.length); + assertEquals(18, parse.getAttributeKeyOffset()); + assertEquals(3, parser.getAttributesKeyColumnIndex()); + String attributes[] = parse.getIndividualAttributes(); + assertEquals(attributes[0], "key=>value"); + try { + line = Bytes.toBytes("rowkey\tval_a\t1234"); + parser.parse(line, line.length); + fail("Should get BadTsvLineException on empty rowkey."); + } catch (BadTsvLineException b) { + + } + parser = new TsvParser("HBASE_ATTRIBUTES_KEY,col_a,HBASE_ROW_KEY,HBASE_TS_KEY", "\t"); + assertEquals(2, parser.getRowKeyColumnIndex()); + line = Bytes.toBytes("key=>value\tval_a\trowkey\t1234"); + parse = parser.parse(line, line.length); + assertEquals(0, parse.getAttributeKeyOffset()); + assertEquals(0, parser.getAttributesKeyColumnIndex()); + attributes = parse.getIndividualAttributes(); + assertEquals(attributes[0], "key=>value"); + try { + line = Bytes.toBytes("val_a"); + ParsedLine parse2 = parser.parse(line, line.length); + fail("Should get BadTsvLineException when number of columns less than rowkey position."); + } catch (BadTsvLineException b) { + + } + parser = new TsvParser("col_a,HBASE_ATTRIBUTES_KEY,HBASE_TS_KEY,HBASE_ROW_KEY", "\t"); + assertEquals(3, parser.getRowKeyColumnIndex()); + line = Bytes.toBytes("val_a\tkey0=>value0,key1=>value1,key2=>value2\t1234\trowkey"); + parse = parser.parse(line, line.length); + assertEquals(1, parser.getAttributesKeyColumnIndex()); + assertEquals(6, parse.getAttributeKeyOffset()); + String[] attr = parse.getIndividualAttributes(); + int i = 0; + for(String str : attr) { + assertEquals(("key"+i+"=>"+"value"+i), str ); + i++; + } + } + } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java new file mode 100644 index 0000000..7fed83f --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * + * Just shows a simple example of how the attributes can be extracted and added + * to the puts + */ +public class TsvImporterCustomTestMapperForOprAttr extends TsvImporterMapper { + @Override + protected KeyValue createPuts(byte[] lineBytes, ParsedLine parsed, Put put, int i) + throws BadTsvLineException, IOException { + KeyValue kv; + kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), + parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, parsed.getColumnOffset(i), + parsed.getColumnLength(i)); + if (parsed.getIndividualAttributes() != null) { + String[] attributes = parsed.getIndividualAttributes(); + for (String attr : attributes) { + String[] split = attr.split(ImportTsv.DEFAULT_ATTRIBUTES_SEPERATOR); + if (split == null || split.length <= 1) { + throw new BadTsvLineException("Invalid attributes seperator specified" + attributes); + } else { + if (split[0].length() <= 0 || split[1].length() <= 0) { + throw new BadTsvLineException("Invalid attributes seperator specified" + attributes); + } + put.setAttribute(split[0], Bytes.toBytes(split[1])); + } + } + } + put.add(kv); + return kv; + } +}