diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 99fe157..c248cd2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -41,6 +41,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.TimestampPolicy; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -192,6 +193,19 @@ public class HTableDescriptor implements WritableComparable { /** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */ private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT; + /** + * INTERNAL {@link TimestampPolicy} setting for the table. + */ + public static final String TSPOLICY = "TSPOLICY"; + private static final ImmutableBytesWritable TSPOLICY_KEY = + new ImmutableBytesWritable(Bytes.toBytes("TSPOLICY")); + + /** Default {@link TimestampPolicy} for HTD is {@link TimestampPolicy#SERVER_TS} */ + private static final TimestampPolicy DEFAULT_TSPOLICY = TimestampPolicy.SERVER_TS; + + /** Default {@link TimestampPolicy} for legacy table is {@link TimestampPolicy#MIXED} */ + private static final TimestampPolicy LEGACY_TSPOLICY = TimestampPolicy.MIXED; + /* * The below are ugly but better than creating them each time till we * replace booleans being saved as Strings with plain booleans. Need a @@ -242,6 +256,7 @@ public class HTableDescriptor implements WritableComparable { } RESERVED_KEYWORDS.add(IS_ROOT_KEY); RESERVED_KEYWORDS.add(IS_META_KEY); + RESERVED_KEYWORDS.add(TSPOLICY_KEY); } /** @@ -269,6 +284,8 @@ public class HTableDescriptor implements WritableComparable { * catalog tables, hbase:meta and -ROOT-. */ protected HTableDescriptor(final TableName name, HColumnDescriptor[] families) { + // set for newly created tables + setTimestampPolicy(DEFAULT_TSPOLICY); setName(name); for(HColumnDescriptor descriptor : families) { this.families.put(descriptor.getName(), descriptor); @@ -281,6 +298,8 @@ public class HTableDescriptor implements WritableComparable { */ protected HTableDescriptor(final TableName name, HColumnDescriptor[] families, Map values) { + // set for newly created tables + setTimestampPolicy(DEFAULT_TSPOLICY); setName(name); for(HColumnDescriptor descriptor : families) { this.families.put(descriptor.getName(), descriptor); @@ -308,6 +327,8 @@ public class HTableDescriptor implements WritableComparable { */ public HTableDescriptor(final TableName name) { super(); + // set for newly created tables + setTimestampPolicy(DEFAULT_TSPOLICY); setName(name); } @@ -638,6 +659,30 @@ public class HTableDescriptor implements WritableComparable { } /** + * Sets the {@link TimestampPolicy} setting for the table. This defaults to + * {@link TimestampPolicy#SERVER_TS}. + * @param tsPolicy enum value + */ + public void setTimestampPolicy(TimestampPolicy tsPolicy) { + setValue(TSPOLICY_KEY, tsPolicy.name()); + } + + /** + * Returns the timestamp policy setting for the table. + * @return timestamp policy setting for the table. + */ + public TimestampPolicy getTimestampPolicy() { + String tsPolicy = Bytes.toString(getValue(TSPOLICY_KEY)); + try { + return TimestampPolicy.valueOf(tsPolicy); + } catch (IllegalArgumentException ex) { + LOG.warn("Received " + ex + " because Timestamp Policy value for HTableDescriptor" + + " is not known. Policy:" + tsPolicy); + return DEFAULT_TSPOLICY; + } + } + + /** * Get the name of the table * * @return TableName @@ -959,6 +1004,7 @@ public class HTableDescriptor implements WritableComparable { setMetaRegion(in.readBoolean()); values.clear(); configuration.clear(); + setTimestampPolicy(LEGACY_TSPOLICY); int numVals = in.readInt(); for (int i = 0; i < numVals; i++) { ImmutableBytesWritable key = new ImmutableBytesWritable(); @@ -1478,6 +1524,10 @@ public class HTableDescriptor implements WritableComparable { HTableDescriptor htd = new HTableDescriptor( ProtobufUtil.toTableName(ts.getTableName()), hcds); + // Default to LEGACY_TSPOLICY here. + // If the table has a ts policy defined we'll + // overwrite it, otherwise we have a legacy table + htd.setTimestampPolicy(LEGACY_TSPOLICY); for (BytesBytesPair a: ts.getAttributesList()) { htd.setValue(a.getFirst().toByteArray(), a.getSecond().toByteArray()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/InvalidTimestampException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/InvalidTimestampException.java new file mode 100644 index 0000000..5aa4cdb --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/InvalidTimestampException.java @@ -0,0 +1,35 @@ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.DoNotRetryIOException; + +@SuppressWarnings("serial") +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class InvalidTimestampException extends DoNotRetryIOException { + public InvalidTimestampException() { + super("The client provided a timestamp that not acceptable for the table's timestamp policy"); + } + /** + * @param message + */ + public InvalidTimestampException(String message) { + super(message); + } + + /** + * @param message + * @param cause + */ + public InvalidTimestampException(String message, Throwable cause) { + super(message, cause); + } + + /** + * @param cause + */ + public InvalidTimestampException(Throwable cause) { + super(cause); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TimestampPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TimestampPolicy.java new file mode 100644 index 0000000..afbd5dc --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TimestampPolicy.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Enum describing the timestamp policies for tables. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public enum TimestampPolicy { + /* Developer note: Do not rename the enum field names. They are serialized in HTableDescriptor */ + /** + * Use the WAL edit seqNumId see {@link HLogKey}. This is guarenteed to be monotonically + * increasing. Timestamps of this form will not conform to wall clock time, and features such as + * TTL will not work. + * Clients cannot set custom timestamps on KeyValues. At attempt to do so will + * result in an exception. + */ + SEQ_ID(false, false, false), + + /** + * Only the server can set timestamps. + * Clients cannot set custom timestamps on KeyValues. At attempt to do so will + * result in an exception. + * This is the default in 1.0 and newer. All newly created table will this setting by default. + */ + SERVER_TS(false, true, true), + + /** + * Only the client will set timestamps on KeyValue. Failure to do so will results in an exception. + * Client promises to provide globally monotonically increasing timestamps. The server + * has no way to validate this. If the client breaks that promise data might me lost or previously + * deleted data might be visible again due to premature compactions. + * USE WITH CAUTION. + */ + CLIENT_MONOTONIC(true, false, true), + + /** + * The Client may set timestamps. If timestamps are not provided the server will generate one. + * This was default in HBase 0.98 and older, and all tables created prior to 1.0 will continue + * to operate this way. + */ + MIXED(true, true, true); + + private boolean serverMaySet; + private boolean clientMaySet; + private boolean wallClockTime; + private TimestampPolicy(boolean clientMaySet, boolean serverMaySet, boolean wallClockTime) { + this.clientMaySet = clientMaySet; + this.serverMaySet = serverMaySet; + this.wallClockTime = wallClockTime; + } + + /** + * Determine whether a policy can be changed to another one. + * @param from the policy we're trying to change from + * @param to the new desired policy + * @return true of the transition is OK, false otherwise + */ + public static boolean transitionOk(TimestampPolicy from, TimestampPolicy to) { + return from.wallClockTime == to.wallClockTime && (from.clientMaySet == to.clientMaySet || from.serverMaySet == to.serverMaySet); + } + public boolean clientMaySetTS() { + return clientMaySet; + } + public boolean serverMaySetTS() { + return serverMaySet; + } + public boolean isWallClockTime() { + return wallClockTime; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6f74b4d..1567661 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -90,12 +90,14 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.InvalidTimestampException; import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TimestampPolicy; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; @@ -521,6 +523,7 @@ public class HRegion implements HeapSize { // , Writable{ private final MetricsRegion metricsRegion; private final MetricsRegionWrapperImpl metricsRegionWrapper; private final Durability durability; + private final TimestampPolicy timestampPolicy; /** * HRegion constructor. This constructor should only be used for testing and @@ -636,6 +639,7 @@ public class HRegion implements HeapSize { // , Writable{ this.durability = htd.getDurability() == Durability.USE_DEFAULT ? DEFAULT_DURABLITY : htd.getDurability(); + this.timestampPolicy = htd.getTimestampPolicy(); if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver @@ -2123,6 +2127,9 @@ public class HRegion implements HeapSize { // , Writable{ for (Cell cell: cells) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + if ((!timestampPolicy.serverMaySetTS() && kv.isLatestTimestamp()) || (!timestampPolicy.clientMaySetTS() && !kv.isLatestTimestamp())) { + throw new InvalidTimestampException(); + } // Check if time is LATEST, change to time of most recent addition if so // This is expensive. if (kv.isLatestTimestamp() && CellUtil.isDeleteType(kv)) { @@ -2940,11 +2947,14 @@ public class HRegion implements HeapSize { // , Writable{ * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the * provided current timestamp. */ - void updateKVTimestamps(final Iterable> keyLists, final byte[] now) { + void updateKVTimestamps(final Iterable> keyLists, final byte[] now) throws IOException { for (List cells: keyLists) { if (cells == null) continue; for (Cell cell : cells) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + if ((!timestampPolicy.serverMaySetTS() && kv.isLatestTimestamp()) || (!timestampPolicy.clientMaySetTS() && !kv.isLatestTimestamp())) { + throw new InvalidTimestampException(); + } kv.updateLatestStamp(now); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTimestampPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTimestampPolicy.java new file mode 100644 index 0000000..617c2ae --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTimestampPolicy.java @@ -0,0 +1,118 @@ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestTimestampPolicy { + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final byte[] FAM = Bytes.toBytes("f"); + private final byte[] ROW = Bytes.toBytes("r"); + private final byte[] VAL = Bytes.toBytes("v"); + private final byte[] QUAL = Bytes.toBytes("q"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testTableDescriptor() throws Exception { + TableName name = TableName.valueOf("testTableDescriptorDefault"); + createTableWithTSPolicy(name, null); + HTableDescriptor htd = TEST_UTIL.getHBaseAdmin().getTableDescriptor(name); + // default is SERVER_TS + assertEquals(TimestampPolicy.SERVER_TS, htd.getTimestampPolicy()); + for (TimestampPolicy p : TimestampPolicy.values()) { + name = TableName.valueOf("testTableDescriptor"+p.name()); + createTableWithTSPolicy(name, p); + htd = TEST_UTIL.getHBaseAdmin().getTableDescriptor(name); + assertEquals(p, htd.getTimestampPolicy()); + } + } + + @Test + public void testMutations() throws Exception { + for (TimestampPolicy p : TimestampPolicy.values()) { + TableName name = TableName.valueOf("testMutations"+p.name()); + createTableWithTSPolicy(name, p); + HTable t = new HTable(TEST_UTIL.getConfiguration(), name); + Put noTs = new Put(ROW); + noTs.add(FAM,QUAL,VAL); + Put withTs = new Put(ROW, System.currentTimeMillis()); + withTs.add(FAM,QUAL,VAL); + Put withTs1 = new Put(ROW); + withTs1.add(FAM,QUAL, System.currentTimeMillis() ,VAL); + + assertEquals(p.serverMaySetTS(), tryPut(t,noTs)); + assertEquals(p.clientMaySetTS(), tryPut(t,withTs)); + assertEquals(p.clientMaySetTS(), tryPut(t,withTs1)); + + Delete delColLatest = new Delete(ROW); + delColLatest.deleteColumn(FAM, QUAL); + Delete delColTs = new Delete(ROW); + delColTs.deleteColumn(FAM, QUAL, System.currentTimeMillis()); + Delete delColsNoTs = new Delete(ROW); + delColsNoTs.deleteColumns(FAM, QUAL); + Delete delColsTs = new Delete(ROW); + delColsTs.deleteColumns(FAM, QUAL, System.currentTimeMillis()); + Delete delFamTs = new Delete(ROW); + delFamTs.deleteFamily(FAM, System.currentTimeMillis()); + Delete delFamNoTs = new Delete(ROW); + delFamNoTs.deleteFamily(FAM); + + assertEquals(p.serverMaySetTS(), tryDelete(t, delColLatest)); + assertEquals(p.clientMaySetTS(), tryDelete(t, delColTs)); + assertEquals(p.serverMaySetTS(), tryDelete(t, delColsNoTs)); + assertEquals(p.clientMaySetTS(), tryDelete(t, delColsTs)); + assertEquals(p.serverMaySetTS(), tryDelete(t, delFamNoTs)); + assertEquals(p.clientMaySetTS(), tryDelete(t, delFamTs)); + t.close(); + } + } + + @Test + public void testAlter() throws Exception { + } + + private boolean tryPut(HTable t, Put p) { + try { + t.put(p); + return true; + } catch (IOException x) { + return false; + } + } + + private boolean tryDelete(HTable t, Delete d) { + try { + t.delete(d); + return true; + } catch (IOException x) { + return false; + } + } + + private void createTableWithTSPolicy(TableName name, TimestampPolicy policy) throws IOException { + HTableDescriptor htd = new HTableDescriptor(name); + htd.addFamily(new HColumnDescriptor(FAM)); + if (policy != null) { + htd.setTimestampPolicy(policy); + } + TEST_UTIL.getHBaseAdmin().createTable(htd); + TEST_UTIL.waitUntilAllRegionsAssigned(name); + } +} diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 532f073..6f1e465 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -269,6 +269,7 @@ module Hbase htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(MEMSTORE_FLUSHSIZE))) if arg[MEMSTORE_FLUSHSIZE] htd.setAsyncLogFlush(JBoolean.valueOf(arg.delete(DEFERRED_LOG_FLUSH))) if arg[DEFERRED_LOG_FLUSH] htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(DURABILITY))) if arg[DURABILITY] + htd.setTimestampPolicy(org.apache.hadoop.hbase.client.TimestampPolicy.valueOf(arg.delete(TSPOLICY))) if arg[TSPOLICY] set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA] set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]