diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index b53bb28..a2cb10b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -112,9 +112,15 @@ public class ImportTsv extends Configured implements Tool { public static final String ATTRIBUTES_COLUMN_SPEC = "HBASE_ATTRIBUTES_KEY"; + public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY"; + private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX; public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1; + + public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1; + + private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; /** * @param columnsSpecification the list of columns to parser out, comma separated. * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC @@ -149,6 +155,10 @@ public class ImportTsv extends Configured implements Tool { attrKeyColumnIndex = i; continue; } + if(CELL_VISIBILITY_COLUMN_SPEC.equals(str)) { + cellVisibilityColumnIndex = i; + continue; + } String[] parts = str.split(":", 2); if (parts.length == 1) { families[i] = str.getBytes(); @@ -172,9 +182,17 @@ public class ImportTsv extends Configured implements Tool { return attrKeyColumnIndex != DEFAULT_ATTRIBUTES_COLUMN_INDEX; } + public boolean hasCellVisibility() { + return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; + } + public int getAttributesKeyColumnIndex() { return attrKeyColumnIndex; } + + public int getCellVisibilityColumnIndex() { + return cellVisibilityColumnIndex; + } public int getRowKeyColumnIndex() { return rowKeyColumnIndex; } @@ -209,6 +227,8 @@ public class ImportTsv extends Configured implements Tool { throw new BadTsvLineException("No timestamp"); } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) { throw new BadTsvLineException("No attributes specified"); + } else if(hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) { + throw new BadTsvLineException("No cell visibility specified"); } return new ParsedLine(tabOffsets, lineBytes); } @@ -279,8 +299,32 @@ public class ImportTsv extends Configured implements Tool { return DEFAULT_ATTRIBUTES_COLUMN_INDEX; } } - - + + public int getCellVisibilityColumnOffset() { + if (hasCellVisibility()) { + return getColumnOffset(cellVisibilityColumnIndex); + } else { + return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; + } + } + + public int getCellVisibilityColumnLength() { + if (hasCellVisibility()) { + return getColumnLength(cellVisibilityColumnIndex); + } else { + return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; + } + } + + public String getCellVisibility() { + if (!hasCellVisibility()) { + return null; + } else { + return Bytes.toString(lineBytes, getColumnOffset(cellVisibilityColumnIndex), + getColumnLength(cellVisibilityColumnIndex)); + } + } + public int getColumnOffset(int idx) { if (idx > 0) return tabOffsets.get(idx - 1) + 1; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MapReduceLabelExpander.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MapReduceLabelExpander.java new file mode 100644 index 0000000..bd0dd92 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MapReduceLabelExpander.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY; +import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME; +import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABEL_QUALIFIER; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; +import org.apache.hadoop.hbase.security.visibility.Authorizations; +import org.apache.hadoop.hbase.security.visibility.ExpressionExpander; +import org.apache.hadoop.hbase.security.visibility.ExpressionParser; +import org.apache.hadoop.hbase.security.visibility.ParseException; +import org.apache.hadoop.hbase.security.visibility.VisibilityUtils; +import org.apache.hadoop.hbase.security.visibility.expression.ExpressionNode; +import org.apache.hadoop.hbase.security.visibility.expression.LeafExpressionNode; +import org.apache.hadoop.hbase.security.visibility.expression.NonLeafExpressionNode; +import org.apache.hadoop.hbase.security.visibility.expression.Operator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; + +/** + * An utility class that helps the mapper and reducers used with visibility to + * scan the visibility_labels and helps in parsing and expanding the visibility + * tags + * + */ +public class MapReduceLabelExpander { + private Configuration conf; + private ExpressionParser parser = new ExpressionParser(); + private ExpressionExpander expander = new ExpressionExpander(); + + public MapReduceLabelExpander(Configuration conf) { + this.conf = conf; + } + + private Map labels; + + // TODO : The code repeats from that in Visibility Controller.. Refactoring + // may be needed + public List createVisibilityTags(String visibilityLabelsExp) throws IOException, + BadTsvLineException { + ExpressionNode node = null; + try { + node = parser.parse(visibilityLabelsExp); + } catch (ParseException e) { + throw new BadTsvLineException(e.getMessage()); + } + node = expander.expand(node); + List tags = new ArrayList(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + if (node.isSingleNode()) { + writeLabelOrdinalsToStream(node, dos); + tags.add(new Tag(VisibilityUtils.VISIBILITY_TAG_TYPE, baos.toByteArray())); + baos.reset(); + } else { + NonLeafExpressionNode nlNode = (NonLeafExpressionNode) node; + if (nlNode.getOperator() == Operator.OR) { + for (ExpressionNode child : nlNode.getChildExps()) { + writeLabelOrdinalsToStream(child, dos); + tags.add(new Tag(VisibilityUtils.VISIBILITY_TAG_TYPE, baos.toByteArray())); + baos.reset(); + } + } else { + writeLabelOrdinalsToStream(nlNode, dos); + tags.add(new Tag(VisibilityUtils.VISIBILITY_TAG_TYPE, baos.toByteArray())); + baos.reset(); + } + } + return tags; + } + + private void writeLabelOrdinalsToStream(ExpressionNode node, DataOutputStream dos) + throws IOException, BadTsvLineException { + if (node.isSingleNode()) { + String identifier = null; + int labelOrdinal = 0; + if (node instanceof LeafExpressionNode) { + identifier = ((LeafExpressionNode) node).getIdentifier(); + if (this.labels.get(identifier) != null) { + labelOrdinal = this.labels.get(identifier); + } + } else { + // This is a NOT node. + LeafExpressionNode lNode = (LeafExpressionNode) ((NonLeafExpressionNode) node) + .getChildExps().get(0); + identifier = lNode.getIdentifier(); + if (this.labels.get(identifier) != null) { + labelOrdinal = this.labels.get(identifier); + labelOrdinal = -1 * labelOrdinal; // Store NOT node as -ve ordinal. + } + } + if (labelOrdinal == 0) { + throw new BadTsvLineException("Invalid visibility label " + identifier); + } + WritableUtils.writeVInt(dos, labelOrdinal); + } else { + List childExps = ((NonLeafExpressionNode) node).getChildExps(); + for (ExpressionNode child : childExps) { + writeLabelOrdinalsToStream(child, dos); + } + } + } + + private void createLabels() throws IOException { + // This scan should be done by user with global_admin previliges.. Ensure + // that it works + HTable visibilityLabelsTable = null; + try { + labels = new HashMap(); + visibilityLabelsTable = new HTable(conf, LABELS_TABLE_NAME.getName()); + Scan scan = new Scan(); + scan.setAuthorizations(new Authorizations(VisibilityUtils.SYSTEM_LABEL)); + scan.addColumn(LABELS_TABLE_FAMILY, LABEL_QUALIFIER); + ResultScanner scanner = visibilityLabelsTable.getScanner(scan); + while (true) { + Result next = scanner.next(); + if (next == null) { + break; + } + byte[] row = next.getRow(); + byte[] value = next.getValue(LABELS_TABLE_FAMILY, LABEL_QUALIFIER); + labels.put(Bytes.toString(value), Bytes.toInt(row)); + } + scanner.close(); + } finally { + if (visibilityLabelsTable != null) { + visibilityLabelsTable.close(); + } + } + } + + /** + * Creates a kv from the cell visibility expr specified in the ImportTSV and uses it as the + * visibility tag in the kv + * @param rowKeyOffset + * @param rowKeyLength + * @param family + * @param familyOffset + * @param familyLength + * @param qualifier + * @param qualifierOffset + * @param qualifierLength + * @param ts + * @param put + * @param lineBytes + * @param columnOffset + * @param columnLength + * @param cellVisibilityExpr + * @return + * @throws IOException + * @throws BadTsvLineException + */ + public KeyValue createKVFromCellVisibilityExpr(int rowKeyOffset, int rowKeyLength, byte[] family, + int familyOffset, int familyLength, byte[] qualifier, int qualifierOffset, + int qualifierLength, long ts, Type put, byte[] lineBytes, int columnOffset, int columnLength, + String cellVisibilityExpr) throws IOException, BadTsvLineException { + if(this.labels == null && cellVisibilityExpr != null) { + createLabels(); + } + KeyValue kv = null; + if (cellVisibilityExpr != null) { + // Apply the expansion and parsing here + List visibilityTags = createVisibilityTags(cellVisibilityExpr); + kv = new KeyValue(lineBytes, rowKeyOffset, rowKeyLength, family, familyOffset, familyLength, + qualifier, qualifierOffset, qualifierLength, ts, KeyValue.Type.Put, lineBytes, columnOffset, + columnLength, visibilityTags); + } else { + kv = new KeyValue(lineBytes, rowKeyOffset, rowKeyLength, family, familyOffset, familyLength, + qualifier, qualifierOffset, qualifierLength, ts, KeyValue.Type.Put, lineBytes, columnOffset, + columnLength); + } + return kv; + } +} \ No newline at end of file diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java index d503079..a3bcd24 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java @@ -58,6 +58,11 @@ public class TextSortReducer extends private ImportTsv.TsvParser parser; + /** Cell visibility expr **/ + private String cellVisibilityExpr; + + private MapReduceLabelExpander mapReduceLabelExpander; + public long getTs() { return ts; } @@ -92,6 +97,7 @@ public class TextSortReducer extends if (parser.getRowKeyColumnIndex() == -1) { throw new RuntimeException("No row key column specified"); } + mapReduceLabelExpander = new MapReduceLabelExpander(conf); } /** @@ -140,16 +146,27 @@ public class TextSortReducer extends ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength()); // Retrieve timestamp if exists ts = parsed.getTimestamp(ts); + cellVisibilityExpr = parsed.getCellVisibility(); for (int i = 0; i < parsed.getColumnCount(); i++) { - if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()) { + if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() + || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) { continue; } - KeyValue kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), - parsed.getRowKeyLength(), parser.getFamily(i), 0, - parser.getFamily(i).length, parser.getQualifier(i), 0, - parser.getQualifier(i).length, ts, KeyValue.Type.Put, - lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i)); + KeyValue kv = null; + if (cellVisibilityExpr == null) { + kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), + parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, + parsed.getColumnOffset(i), parsed.getColumnLength(i)); + } else { + // Should ensure that VisibilityController is present + kv = mapReduceLabelExpander.createKVFromCellVisibilityExpr( + parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0, + parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, + parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr); + } kvs.add(kv); curSize += kv.heapSize(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java index 9e14212..0333378 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; +import org.apache.hadoop.hbase.security.visibility.CellVisibility; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -55,6 +56,12 @@ extends Mapper protected Configuration conf; + protected String cellVisibilityExpr; + + private String hfileOutPath; + + private MapReduceLabelExpander mapReduceLabelExpander; + public long getTs() { return ts; } @@ -89,6 +96,7 @@ extends Mapper if (parser.getRowKeyColumnIndex() == -1) { throw new RuntimeException("No row key column specified"); } + mapReduceLabelExpander = new MapReduceLabelExpander(conf); } /** @@ -113,6 +121,7 @@ extends Mapper skipBadLines = context.getConfiguration().getBoolean( ImportTsv.SKIP_LINES_CONF_KEY, true); badLineCount = context.getCounter("ImportTsv", "Bad Lines"); + hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY); } /** @@ -133,11 +142,12 @@ extends Mapper parsed.getRowKeyLength()); // Retrieve timestamp if exists ts = parsed.getTimestamp(ts); + cellVisibilityExpr = parsed.getCellVisibility(); Put put = new Put(rowKey.copyBytes()); for (int i = 0; i < parsed.getColumnCount(); i++) { if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() - || i == parser.getAttributesKeyColumnIndex()) { + || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) { continue; } KeyValue kv = createPuts(lineBytes, parsed, put, i); @@ -170,11 +180,24 @@ extends Mapper protected KeyValue createPuts(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put, int i) throws BadTsvLineException, IOException { - KeyValue kv; - kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), - parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, - parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, parsed.getColumnOffset(i), - parsed.getColumnLength(i)); + KeyValue kv = null; + if (hfileOutPath == null) { + kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), + parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, + parsed.getColumnOffset(i), parsed.getColumnLength(i)); + if (cellVisibilityExpr != null) { + // We won't be validating the expression here. The Visibility CP will do + // the validation + put.setCellVisibility(new CellVisibility(cellVisibilityExpr)); + } + } else { + kv = mapReduceLabelExpander.createKVFromCellVisibilityExpr( + parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0, + parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, + parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr); + } put.add(kv); return kv; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java index 935c7a3..c21654b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java @@ -799,6 +799,7 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb if (node instanceof LeafExpressionNode) { identifier = ((LeafExpressionNode) node) .getIdentifier(); + LOG.debug("The identifier is "+identifier); labelOrdinal = this.visibilityManager.getLabelOrdinal(identifier); } else { // This is a NOT node. @@ -949,11 +950,13 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb // for non-rpc handling, fallback to system user user = User.getCurrent(); } + LOG.debug("Current active user name is "+user.getShortName()); return user; } private List getSystemAndSuperUsers() throws IOException { User user = User.getCurrent(); + LOG.debug("Current user name is "+user.getShortName()); if (user == null) { throw new IOException("Unable to obtain the current user, " + "authorization checks for internal operations will not work correctly!"); @@ -1040,6 +1043,7 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb Put p = new Put(Bytes.toBytes(ordinalCounter)); p.addImmutable( LABELS_TABLE_FAMILY, LABEL_QUALIFIER, label, LABELS_TABLE_TAGS); + LOG.debug("Adding the label "+labelStr); puts.add(p); ordinalCounter++; response.addResult(successResult); @@ -1264,6 +1268,7 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb throw new IOException("Unable to retrieve calling user"); } List auths = this.visibilityManager.getAuths(user.getShortName()); + LOG.debug("The list of auths are "+auths); if (!auths.contains(SYSTEM_LABEL)) { throw new AccessDeniedException("User '" + user.getShortName() + "' is not authorized to perform this action."); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java new file mode 100644 index 0000000..077c95a --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.visibility.Authorizations; +import org.apache.hadoop.hbase.security.visibility.ScanLabelGenerator; +import org.apache.hadoop.hbase.security.visibility.SimpleScanLabelGenerator; +import org.apache.hadoop.hbase.security.visibility.VisibilityClient; +import org.apache.hadoop.hbase.security.visibility.VisibilityConstants; +import org.apache.hadoop.hbase.security.visibility.VisibilityController; +import org.apache.hadoop.hbase.security.visibility.VisibilityUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestImportTSVWithVisibilityLabels implements Configurable { + + protected static final Log LOG = LogFactory.getLog(TestImportTSVWithVisibilityLabels.class); + protected static final String NAME = TestImportTsv.class.getSimpleName(); + protected static HBaseTestingUtility util = new HBaseTestingUtility(); + + /** + * Delete the tmp directory after running doMROnTableTest. Boolean. Default is + * false. + */ + protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; + + /** + * Force use of combiner in doMROnTableTest. Boolean. Default is true. + */ + protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; + + private final String FAMILY = "FAM"; + private final static String TOPSECRET = "topsecret"; + private final static String PUBLIC = "public"; + private final static String PRIVATE = "private"; + private final static String CONFIDENTIAL = "confidential"; + private final static String SECRET = "secret"; + private static User SUPERUSER; + private static Configuration conf; + + public Configuration getConf() { + return util.getConfiguration(); + } + + public void setConf(Configuration conf) { + throw new IllegalArgumentException("setConf not supported"); + } + + @BeforeClass + public static void provisionCluster() throws Exception { + conf = util.getConfiguration(); + SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" }); + conf.set("hbase.superuser", "admin,"+User.getCurrent().getName()); + conf.setInt("hfile.format.version", 3); + conf.set("hbase.coprocessor.master.classes", VisibilityController.class.getName()); + conf.set("hbase.coprocessor.region.classes", VisibilityController.class.getName()); + conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, SimpleScanLabelGenerator.class, + ScanLabelGenerator.class); + util.startMiniCluster(); + // Wait for the labels table to become available + util.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME.getName(), 50000); + createLabels(); + HBaseAdmin admin = new HBaseAdmin(util.getConfiguration()); + util.startMiniMapReduceCluster(); + } + + private static void createLabels() throws IOException, InterruptedException { + PrivilegedExceptionAction action = + new PrivilegedExceptionAction() { + public VisibilityLabelsResponse run() throws Exception { + String[] labels = { SECRET, TOPSECRET, CONFIDENTIAL, PUBLIC, PRIVATE }; + try { + VisibilityClient.addLabels(conf, labels); + LOG.info("Added labels "); + } catch (Throwable t) { + LOG.error("Error in adding labels" , t); + throw new IOException(t); + } + return null; + } + }; + SUPERUSER.runAs(action); + } + + @AfterClass + public static void releaseCluster() throws Exception { + util.shutdownMiniMapReduceCluster(); + util.shutdownMiniCluster(); + } + + @Test + public void testMROnTable() throws Exception { + String tableName = "test-" + UUID.randomUUID(); + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName }; + String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1); + util.deleteTable(tableName); + } + + @Test + public void testMROnTableWithBulkload() throws Exception { + String tableName = "test-" + UUID.randomUUID(); + Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName), "hfiles"); + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), + "-D" + ImportTsv.COLUMNS_CONF_KEY + + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName }; + String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1); + util.deleteTable(tableName); + } + + @Test + public void testBulkOutputWithTsvImporterTextMapper() throws Exception { + String table = "test-" + UUID.randomUUID(); + String FAMILY = "FAM"; + Path bulkOutputPath = new Path(util.getDataTestDir(table),"hfiles"); + // Prepare the arguments required for the test. + String[] args = + new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", + "-D" + ImportTsv.COLUMNS_CONF_KEY + + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table + }; + String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; + doMROnTableTest(util, FAMILY, data, args, 4); + util.deleteTable(table); + } + + @Test + public void testMRWithOutputFormat() throws Exception { + String tableName = "test-" + UUID.randomUUID(); + Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName), "hfiles"); + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName }; + String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1); + util.deleteTable(tableName); + } + + /** + * Run an ImportTsv job and perform basic validation on the results. Returns + * the ImportTsv Tool instance so that other tests can inspect it + * for further validation as necessary. This method is static to insure + * non-reliance on instance's util/conf facilities. + * + * @param args + * Any arguments to pass BEFORE inputFile path is appended. + * @return The Tool instance used to run the test. + */ + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, + String[] args, int valueMultiplier) throws Exception { + String table = args[args.length - 1]; + Configuration conf = new Configuration(util.getConfiguration()); + + // populate input file + FileSystem fs = FileSystem.get(conf); + Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat")); + FSDataOutputStream op = fs.create(inputPath, true); + if (data == null) { + data = "KEY\u001bVALUE1\u001bVALUE2\n"; + } + op.write(Bytes.toBytes(data)); + op.close(); + LOG.debug(String.format("Wrote test data to file: %s", inputPath)); + + if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { + LOG.debug("Forcing combiner."); + conf.setInt("min.num.spills.for.combine", 1); + } + + // run the import + List argv = new ArrayList(Arrays.asList(args)); + argv.add(inputPath.toString()); + Tool tool = new ImportTsv(); + LOG.debug("Running ImportTsv with arguments: " + argv); + assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); + + // Perform basic validation. If the input args did not include + // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. + // Otherwise, validate presence of hfiles. + boolean createdHFiles = false; + String outputPath = null; + for (String arg : argv) { + if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) { + createdHFiles = true; + // split '-Dfoo=bar' on '=' and keep 'bar' + outputPath = arg.split("=")[1]; + break; + } + } + LOG.debug("validating the table " + createdHFiles); + if (createdHFiles) + validateHFiles(fs, outputPath, family); + else + validateTable(conf, table, family, valueMultiplier); + + if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { + LOG.debug("Deleting test subdirectory"); + util.cleanupDataTestDirOnTestFS(table); + } + return tool; + } + + /** + * Confirm ImportTsv via HFiles on fs. + */ + private static void validateHFiles(FileSystem fs, String outputPath, String family) + throws IOException { + + // validate number and content of output columns + LOG.debug("Validating HFiles."); + Set configFamilies = new HashSet(); + configFamilies.add(family); + Set foundFamilies = new HashSet(); + for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { + LOG.debug("The output path has files"); + String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); + String cf = elements[elements.length - 1]; + foundFamilies.add(cf); + assertTrue(String.format( + "HFile ouput contains a column family (%s) not present in input families (%s)", cf, + configFamilies), configFamilies.contains(cf)); + for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { + assertTrue(String.format("HFile %s appears to contain no data.", hfile.getPath()), + hfile.getLen() > 0); + } + } + } + + /** + * Confirm ImportTsv via data in online table. + */ + private static void validateTable(Configuration conf, String tableName, String family, + int valueMultiplier) throws IOException { + + LOG.debug("Validating table."); + HTable table = new HTable(conf, tableName); + boolean verified = false; + long pause = conf.getLong("hbase.client.pause", 5 * 1000); + int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); + for (int i = 0; i < numRetries; i++) { + try { + Scan scan = new Scan(); + // Scan entire family. + scan.addFamily(Bytes.toBytes(family)); + scan.setAuthorizations(new Authorizations("secret","private")); + ResultScanner resScanner = table.getScanner(scan); + Result[] next = resScanner.next(5); + assertEquals(1, next.length); + for (Result res : resScanner) { + LOG.debug("Getting results " + res.size()); + assertTrue(res.size() == 2); + List kvs = res.listCells(); + assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY"))); + assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY"))); + assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); + assertTrue(CellUtil.matchingValue(kvs.get(1), + Bytes.toBytes("VALUE" + 2 * valueMultiplier))); + // Only one result set is expected, so let it loop. + } + verified = true; + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + table.close(); + assertTrue(verified); + } + +} \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index 4e83eeb..5333881 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.mapreduce; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -42,7 +41,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java index 474c388..29eadf1 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java @@ -294,5 +294,20 @@ public class TestImportTsvParser { i++; } } + + @Test + public void testTsvParserWithCellVisibilityCol() throws BadTsvLineException { + TsvParser parser = new TsvParser( + "HBASE_ROW_KEY,col_a,HBASE_TS_KEY,HBASE_ATTRIBUTES_KEY,HBASE_CELL_VISIBILITY", "\t"); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertEquals(4, parser.getCellVisibilityColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\tval_a\t1234\tkey=>value\tPRIVATE&SECRET"); + ParsedLine parse = parser.parse(line, line.length); + assertEquals(18, parse.getAttributeKeyOffset()); + assertEquals(3, parser.getAttributesKeyColumnIndex()); + String attributes[] = parse.getIndividualAttributes(); + assertEquals(attributes[0], "key=>value"); + assertEquals(29, parse.getCellVisibilityColumnOffset()); + } }