From 0d36889ef6c07fd656ef23aeb1dd74d3c1007782 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Mon, 17 Nov 2014 14:35:35 -0800 Subject: [PATCH] HBASE-12174 MR support for cell TTLs --- .../apache/hadoop/hbase/mapreduce/CellCreator.java | 35 ++++- .../apache/hadoop/hbase/mapreduce/ImportTsv.java | 55 ++++++- .../hadoop/hbase/mapreduce/TextSortReducer.java | 24 ++- .../hadoop/hbase/mapreduce/TsvImporterMapper.java | 26 +++- .../hbase/mapreduce/TestImportTSVWithTTLs.java | 173 +++++++++++++++++++++ 5 files changed, 305 insertions(+), 8 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java index b3dfee7..001f64d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java @@ -69,7 +69,7 @@ public class CellCreator { byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, int vlength) throws IOException { return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength, - timestamp, value, voffset, vlength, null); + timestamp, value, voffset, vlength, (List)null); } /** @@ -90,6 +90,7 @@ public class CellCreator { * @return created Cell * @throws IOException */ + @Deprecated public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, int vlength, String visExpression) throws IOException { @@ -100,4 +101,36 @@ public class CellCreator { return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags); } + + /** + * @param row row key + * @param roffset row offset + * @param rlength row length + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * @param timestamp version timestamp + * @param value column value + * @param voffset value offset + * @param vlength value length + * @param tags + * @return created Cell + * @throws IOException + */ + public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, + byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, + int vlength, List tags) throws IOException { + return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, + qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags); + } + + /** + * @return Visibility expression resolver + */ + public VisibilityExpressionResolver getVisibilityExpressionResolver() { + return this.visExpResolver; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index f586523..7d8914f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -123,13 +123,20 @@ public class ImportTsv extends Configured implements Tool { public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY"; + public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL"; + private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX; public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1; public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1; + public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1; + private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; + + private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX; + /** * @param columnsSpecification the list of columns to parser out, comma separated. * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC @@ -160,14 +167,18 @@ public class ImportTsv extends Configured implements Tool { timestampKeyColumnIndex = i; continue; } - if(ATTRIBUTES_COLUMN_SPEC.equals(str)) { + if (ATTRIBUTES_COLUMN_SPEC.equals(str)) { attrKeyColumnIndex = i; continue; } - if(CELL_VISIBILITY_COLUMN_SPEC.equals(str)) { + if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) { cellVisibilityColumnIndex = i; continue; } + if (CELL_TTL_COLUMN_SPEC.equals(str)) { + cellTTLColumnIndex = i; + continue; + } String[] parts = str.split(":", 2); if (parts.length == 1) { families[i] = str.getBytes(); @@ -195,6 +206,10 @@ public class ImportTsv extends Configured implements Tool { return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; } + public boolean hasCellTTL() { + return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; + } + public int getAttributesKeyColumnIndex() { return attrKeyColumnIndex; } @@ -202,9 +217,15 @@ public class ImportTsv extends Configured implements Tool { public int getCellVisibilityColumnIndex() { return cellVisibilityColumnIndex; } + + public int getCellTTLColumnIndex() { + return cellTTLColumnIndex; + } + public int getRowKeyColumnIndex() { return rowKeyColumnIndex; } + public byte[] getFamily(int idx) { return families[idx]; } @@ -236,8 +257,10 @@ public class ImportTsv extends Configured implements Tool { throw new BadTsvLineException("No timestamp"); } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) { throw new BadTsvLineException("No attributes specified"); - } else if(hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) { + } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) { throw new BadTsvLineException("No cell visibility specified"); + } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) { + throw new BadTsvLineException("No cell TTL specified"); } return new ParsedLine(tabOffsets, lineBytes); } @@ -334,6 +357,31 @@ public class ImportTsv extends Configured implements Tool { } } + public int getCellTTLColumnOffset() { + if (hasCellTTL()) { + return getColumnOffset(cellTTLColumnIndex); + } else { + return DEFAULT_CELL_TTL_COLUMN_INDEX; + } + } + + public int getCellTTLColumnLength() { + if (hasCellTTL()) { + return getColumnLength(cellTTLColumnIndex); + } else { + return DEFAULT_CELL_TTL_COLUMN_INDEX; + } + } + + public long getCellTTL() { + if (!hasCellTTL()) { + return 0; + } else { + return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex), + getColumnLength(cellTTLColumnIndex)); + } + } + public int getColumnOffset(int idx) { if (idx > 0) return tabOffsets.get(idx - 1) + 1; @@ -492,6 +540,7 @@ public class ImportTsv extends Configured implements Tool { if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn) || TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn) || TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn) + || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn) || TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn)) continue; // we are only concerned with the first one (in case this is a cf:cq) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java index 4a0e0fd..b3981a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.TreeSet; @@ -28,8 +30,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Reducer; @@ -62,6 +67,9 @@ public class TextSortReducer extends /** Cell visibility expr **/ private String cellVisibilityExpr; + /** Cell TTL */ + private long ttl; + private CellCreator kvCreator; public long getTs() { @@ -148,18 +156,30 @@ public class TextSortReducer extends // Retrieve timestamp if exists ts = parsed.getTimestamp(ts); cellVisibilityExpr = parsed.getCellVisibility(); + ttl = parsed.getCellTTL(); for (int i = 0; i < parsed.getColumnCount(); i++) { if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() - || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) { + || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex() + || i == parser.getCellTTLColumnIndex()) { continue; } // Creating the KV which needs to be directly written to HFiles. Using the Facade // KVCreator for creation of kvs. + List tags = new ArrayList(); + if (cellVisibilityExpr != null) { + tags.addAll(kvCreator.getVisibilityExpressionResolver() + .createVisibilityExpTags(cellVisibilityExpr)); + } + // Add TTL directly to the KV so we can vary them when packing more than one KV + // into puts + if (ttl > 0) { + tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); + } Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes, - parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr); + parsed.getColumnOffset(i), parsed.getColumnLength(i), tags); KeyValue kv = KeyValueUtil.ensureKeyValue(cell); kvs.add(kv); curSize += kv.heapSize(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java index ff84081..270de75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java @@ -18,17 +18,22 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; import org.apache.hadoop.hbase.security.visibility.CellVisibility; import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; @@ -59,6 +64,8 @@ extends Mapper protected String cellVisibilityExpr; + protected long ttl; + protected CellCreator kvCreator; private String hfileOutPath; @@ -144,11 +151,13 @@ extends Mapper // Retrieve timestamp if exists ts = parsed.getTimestamp(ts); cellVisibilityExpr = parsed.getCellVisibility(); + ttl = parsed.getCellTTL(); Put put = new Put(rowKey.copyBytes()); for (int i = 0; i < parsed.getColumnCount(); i++) { if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() - || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) { + || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex() + || i == parser.getCellTTLColumnIndex()) { continue; } populatePut(lineBytes, parsed, put, i); @@ -192,13 +201,26 @@ extends Mapper // the validation put.setCellVisibility(new CellVisibility(cellVisibilityExpr)); } + if (ttl > 0) { + put.setTTL(ttl); + } } else { // Creating the KV which needs to be directly written to HFiles. Using the Facade // KVCreator for creation of kvs. + List tags = new ArrayList(); + if (cellVisibilityExpr != null) { + tags.addAll(kvCreator.getVisibilityExpressionResolver() + .createVisibilityExpTags(cellVisibilityExpr)); + } + // Add TTL directly to the KV so we can vary them when packing more than one KV + // into puts + if (ttl > 0) { + tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); + } cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i), - parsed.getColumnLength(i), cellVisibilityExpr); + parsed.getColumnLength(i), tags); } put.add(cell); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java new file mode 100644 index 0000000..a5cceb0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MapReduceTests.class, LargeTests.class}) +public class TestImportTSVWithTTLs implements Configurable { + + protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class); + protected static final String NAME = TestImportTsv.class.getSimpleName(); + protected static HBaseTestingUtility util = new HBaseTestingUtility(); + + /** + * Delete the tmp directory after running doMROnTableTest. Boolean. Default is + * false. + */ + protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; + + /** + * Force use of combiner in doMROnTableTest. Boolean. Default is true. + */ + protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; + + private final String FAMILY = "FAM"; + private static Configuration conf; + + @Override + public Configuration getConf() { + return util.getConfiguration(); + } + + @Override + public void setConf(Configuration conf) { + throw new IllegalArgumentException("setConf not supported"); + } + + @BeforeClass + public static void provisionCluster() throws Exception { + conf = util.getConfiguration(); + // We don't check persistence in HFiles in this test, but if we ever do we will + // need this where the default hfile version is not 3 (i.e. 0.98) + conf.setInt("hfile.format.version", 3); + conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName()); + util.startMiniCluster(); + util.startMiniMapReduceCluster(); + } + + @AfterClass + public static void releaseCluster() throws Exception { + util.shutdownMiniMapReduceCluster(); + util.shutdownMiniCluster(); + } + + @Test + public void testMROnTable() throws Exception { + String tableName = "test-" + UUID.randomUUID(); + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName }; + String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n"; + util.createTable(TableName.valueOf(tableName), FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1); + util.deleteTable(tableName); + } + + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, + String[] args, int valueMultiplier) throws Exception { + TableName table = TableName.valueOf(args[args.length - 1]); + Configuration conf = new Configuration(util.getConfiguration()); + + // populate input file + FileSystem fs = FileSystem.get(conf); + Path inputPath = fs.makeQualified(new Path(util + .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); + FSDataOutputStream op = fs.create(inputPath, true); + op.write(Bytes.toBytes(data)); + op.close(); + LOG.debug(String.format("Wrote test data to file: %s", inputPath)); + + if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { + LOG.debug("Forcing combiner."); + conf.setInt("mapreduce.map.combine.minspills", 1); + } + + // run the import + List argv = new ArrayList(Arrays.asList(args)); + argv.add(inputPath.toString()); + Tool tool = new ImportTsv(); + LOG.debug("Running ImportTsv with arguments: " + argv); + try { + // Job will fail if observer rejects entries without TTL + assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); + } finally { + // Clean up + if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { + LOG.debug("Deleting test subdirectory"); + util.cleanupDataTestDirOnTestFS(table.getNameAsString()); + } + } + + return tool; + } + + public static class TTLCheckingObserver extends BaseRegionObserver { + + @Override + public void prePut(ObserverContext e, Put put, WALEdit edit, + Durability durability) throws IOException { + HRegion region = e.getEnvironment().getRegion(); + if (!region.getRegionInfo().isMetaTable() + && !region.getRegionInfo().getTable().isSystemTable()) { + // The put carries the TTL attribute + if (put.getTTL() != Long.MAX_VALUE) { + return; + } + throw new IOException("Operation does not have TTL set"); + } + } + } +} -- 1.7.12.4 (Apple Git-37)