Index: hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java (revision 1585081) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java (working copy) @@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.security.access.Permission.Action; import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.SimpleByteRange; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -512,12 +513,20 @@ // can stop at the first which does not grant access. int cellsChecked = 0; if (canPersistCellACLs) { - Get get = new Get(row); - if (timestamp != HConstants.LATEST_TIMESTAMP) get.setTimeStamp(timestamp); + Scan scan = new Scan(row, row); + if (timestamp != HConstants.LATEST_TIMESTAMP) { + scan.setTimeRange(0, timestamp + 1); + } else { + // Bound our search to the current (server) time. Storing values with timestamps in the + // future is bad practice and can lead to surprises but we allow that. If cells with + // timestamps in the future have ACLs, permissions from those ACLs will incorrectly be + // considered for authorizing the pending mutation at current server time. + scan.setTimeRange(0, EnvironmentEdgeManager.currentTimeMillis() + 1); + } if (allVersions) { - get.setMaxVersions(); + scan.setMaxVersions(); } else { - get.setMaxVersions(1); + scan.setMaxVersions(1); } for (Map.Entry> entry: familyMap.entrySet()) { byte[] col = entry.getKey(); @@ -526,25 +535,25 @@ if (entry.getValue() instanceof Set) { Set set = (Set)entry.getValue(); if (set == null || set.isEmpty()) { - get.addFamily(col); + scan.addFamily(col); } else { for (byte[] qual: set) { - get.addColumn(col, qual); + scan.addColumn(col, qual); } } } else if (entry.getValue() instanceof List) { List list = (List)entry.getValue(); if (list == null || list.isEmpty()) { - get.addFamily(col); + scan.addFamily(col); } else { // In case of family delete, a Cell will be added into the list with Qualifier as null. for (Cell cell : list) { if (cell.getQualifierLength() == 0 && (cell.getTypeByte() == Type.DeleteFamily.getCode() || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) { - get.addFamily(col); + scan.addFamily(col); } else { - get.addColumn(col, CellUtil.cloneQualifier(cell)); + scan.addColumn(col, CellUtil.cloneQualifier(cell)); } } } @@ -554,9 +563,9 @@ } } if (LOG.isTraceEnabled()) { - LOG.trace("Scanning for cells with " + get); + LOG.trace("Scanning for cells with " + scan); } - RegionScanner scanner = getRegion(e).getScanner(new Scan(get)); + RegionScanner scanner = getRegion(e).getScanner(scan); List cells = Lists.newArrayList(); try { boolean more = false; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java (revision 1585081) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java (working copy) @@ -1333,6 +1333,90 @@ } @Test + public void testCellPermissionsWithDeleteWithTs() throws Exception { + final byte[] TEST_ROW1 = Bytes.toBytes("r1"); + // final byte[] TEST_ROW2 = Bytes.toBytes("r2"); + final byte[] TEST_Q1 = Bytes.toBytes("q1"); + final byte[] TEST_Q2 = Bytes.toBytes("q2"); + final byte[] ZERO = Bytes.toBytes(0L); + final byte[] ONE = Bytes.toBytes(1L); + final byte[] TWO = Bytes.toBytes(2L); + + // additional test user + final User user1 = User.createUserForTesting(conf, "user1", new String[0]); + final User user2 = User.createUserForTesting(conf, "user2", new String[0]); + + HTable t = new HTable(conf, TEST_TABLE.getTableName()); + try { + // This version (TS = 123)with rw ACL for "user1" and "user2" + Put p = new Put(TEST_ROW1); + p.add(TEST_FAMILY, TEST_Q1, 123L, ZERO); + p.add(TEST_FAMILY, TEST_Q2, 123L, ZERO); + Map perms = new HashMap(); + perms.put(user1.getShortName(), new Permission(Permission.Action.READ, + Permission.Action.WRITE)); + perms.put(user2.getShortName(), new Permission(Permission.Action.READ, + Permission.Action.WRITE)); + p.setACL(perms); + t.put(p); + + // This version (TS = 125) with rw ACL for "user1" + p = new Put(TEST_ROW1); + p.add(TEST_FAMILY, TEST_Q1, 125L, ONE); + p.add(TEST_FAMILY, TEST_Q2, 125L, ONE); + perms = new HashMap(); + perms.put(user1.getShortName(), new Permission(Permission.Action.READ, + Permission.Action.WRITE)); + p.setACL(perms); + t.put(p); + + // This version (TS = 127) with rw ACL for "user1" + p = new Put(TEST_ROW1); + p.add(TEST_FAMILY, TEST_Q1, 127L, TWO); + p.add(TEST_FAMILY, TEST_Q2, 127L, TWO); + perms = new HashMap(); + perms.put(user1.getShortName(), new Permission(Permission.Action.READ, + Permission.Action.WRITE)); + p.setACL(perms); + t.put(p); + } finally { + t.close(); + } + + // user2 should be allowed to delete the column f1:q1 versions older than TS 124L + user2.runAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + HTable t = new HTable(conf, TEST_TABLE.getTableName()); + try { + Delete d = new Delete(TEST_ROW1, 124L); + d.deleteColumns(TEST_FAMILY, TEST_Q1); + t.delete(d); + } finally { + t.close(); + } + return null; + } + }); + + // user2 should be allowed to delete the column f1:q2 versions older than TS 124L + user2.runAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + HTable t = new HTable(conf, TEST_TABLE.getTableName()); + try { + Delete d = new Delete(TEST_ROW1); + d.deleteColumns(TEST_FAMILY, TEST_Q2, 124L); + t.delete(d); + } finally { + t.close(); + } + return null; + } + }); + } + + @Test public void testGrantRevoke() throws Exception { AccessTestAction grantAction = new AccessTestAction() { @Override