diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index a0a3c79..be77907 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -185,6 +185,18 @@ public class HTableDescriptor implements Comparable { private static final Bytes REGION_MEMSTORE_REPLICATION_KEY = new Bytes(Bytes.toBytes(REGION_MEMSTORE_REPLICATION)); + /** + * The timestamp resolution to be used by this table. + * Can only be set at table creation time, but cannot be updated afterwards. + * 1 means seconds, 1000 millisconds, 1000000 microseconds, and 1000000000 nanoseconds. + * Other multipliers are possible. + * Note that the actual resolution remains at milliseconds, this just makes + * space in the timestamp for additional bits. + */ + public static final String TIMESTAMP_RESOLUTION = "TIMESTAMP_RESOLUTION"; + private static final Bytes TIMESTAMP_RESOLUTION_KEY = + new Bytes(Bytes.toBytes(TIMESTAMP_RESOLUTION)); + /** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */ private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT; @@ -221,6 +233,11 @@ public class HTableDescriptor implements Comparable { public static final boolean DEFAULT_REGION_MEMSTORE_REPLICATION = true; + /** + * The default timestamp resolution: milliseconds. + */ + public static final int DEFAULT_TIMESTAMP_RESOLUTION = 1000; + private final static Map DEFAULT_VALUES = new HashMap(); private final static Set RESERVED_KEYWORDS @@ -236,6 +253,7 @@ public class HTableDescriptor implements Comparable { String.valueOf(DEFAULT_DEFERRED_LOG_FLUSH)); DEFAULT_VALUES.put(DURABILITY, DEFAULT_DURABLITY.name()); //use the enum name DEFAULT_VALUES.put(REGION_REPLICATION, String.valueOf(DEFAULT_REGION_REPLICATION)); + DEFAULT_VALUES.put(TIMESTAMP_RESOLUTION, String.valueOf(DEFAULT_TIMESTAMP_RESOLUTION)); for (String s : DEFAULT_VALUES.keySet()) { RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s))); } @@ -1554,4 +1572,33 @@ public class HTableDescriptor implements Comparable { public void removeConfiguration(final String key) { configuration.remove(key); } + + /** + * Returns the timestamp resolution set for this table (or the default value). + * + * @return timestamp resolution for this table as a multiplier + * + * @see #setTimestampResolution(int) + */ + public int getTimestampResolution() { + byte [] val = getValue(TIMESTAMP_RESOLUTION_KEY); + if (val == null || val.length == 0) { + return DEFAULT_TIMESTAMP_RESOLUTION; + } + return Integer.parseInt(Bytes.toString(val)); + } + + /** + * @param resolution The resolution to use. + * 1000 for milliseconds, 1000000 for microseconds, a.s.o. + */ + public HTableDescriptor setTimestampResolution(int resolution) { + byte [] val = getValue(TIMESTAMP_RESOLUTION_KEY); + if (val != null) { + LOG.warn("\""+TIMESTAMP_RESOLUTION+"\" parameter cannot be updated! Ignore.", new Exception()); + return this; + } + setValue(TIMESTAMP_RESOLUTION_KEY, Integer.toString(resolution)); + return this; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 5c117f0..b419529 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2843,7 +2843,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // we acquire at least one. // ---------------------------------- int numReadyToWrite = 0; - long now = EnvironmentEdgeManager.currentTime(); + long now = getNow(); while (lastIndexExclusive < batchOp.operations.length) { Mutation mutation = batchOp.getMutation(lastIndexExclusive); boolean isPutMutation = mutation instanceof Put; @@ -2935,9 +2935,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // we should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp - now = EnvironmentEdgeManager.currentTime(); + now = getNow(); byte[] byteNow = Bytes.toBytes(now); - + long wallTime = EnvironmentEdgeManager.currentTime(); // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? if (numReadyToWrite <= 0) return 0L; @@ -3037,7 +3037,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // txid should always increase, so having the one from the last call is ok. // we use HLogKey here instead of WALKey directly to support legacy coprocessors. walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), now, m.getClusterIds(), + this.htableDescriptor.getTableName(), wallTime, m.getClusterIds(), currentNonceGroup, currentNonce); txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, getSequenceId(), true, null); @@ -3081,7 +3081,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!isInReplay) { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, wallTime, mutation.getClusterIds(), currentNonceGroup, currentNonce); } @@ -7844,4 +7844,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return this.getRegionInfo().isMetaRegion() ? CellComparator.META_COMPARATOR : CellComparator.COMPARATOR; } + + @Override + public long getNow() { + // note that this requires the timestamp resolution to dividable by 1000! + return EnvironmentEdgeManager.currentTime() + * (getTableDesc().getTimestampResolution() / 1000); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index d4157f9..eeba502 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; @@ -240,7 +241,7 @@ public class HStore implements Store { LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes + "ms in store " + this); // Get TTL - long ttl = determineTTLFromFamily(family); + long ttl = determineTTLFromFamily(family, region.getTableDesc()); // Why not just pass a HColumnDescriptor in here altogether? Even if have // to clone it? scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator); @@ -347,7 +348,7 @@ public class HStore implements Store { * @param family * @return TTL in seconds of the specified family */ - private static long determineTTLFromFamily(final HColumnDescriptor family) { + private static long determineTTLFromFamily(final HColumnDescriptor family, final HTableDescriptor table) { // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. long ttl = family.getTimeToLive(); if (ttl == HConstants.FOREVER) { @@ -357,7 +358,7 @@ public class HStore implements Store { ttl = Long.MAX_VALUE; } else { // Second -> ms adjust for user data - ttl *= 1000; + ttl *= table.getTimestampResolution(); } return ttl; } @@ -2407,4 +2408,9 @@ public class HStore implements Store { public double getCompactionPressure() { return storeEngine.getStoreFileManager().getCompactionPressure(); } + + @Override + public long getNow() { + return getHRegion().getNow(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 6470e7f..41da7c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -679,4 +679,12 @@ public interface Region extends ConfigurationObserver { /** Wait for all current flushes and compactions of the region to complete */ void waitForFlushesAndCompactions(); + /** + * Get the current time as appropriate for this region. + * This might the current time scaled with a scaling factor, + * or something not related to WAL time at all. + * + * @return the timestamp to use + */ + long getNow(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 3b169ad..a96dbec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -442,4 +442,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf void refreshStoreFiles(Collection newFiles) throws IOException; void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException; + + /** + * Get the current time as appropriate for this store. + * This might the current time scaled with a scaling factor, + * or something not related to WAL time at all. + * + * @return the timestamp to use + */ + long getNow(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 3926902..613d065 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -141,7 +141,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner explicitColumnQuery = numCol > 0; this.scan = scan; this.columns = columns; - this.now = EnvironmentEdgeManager.currentTime(); + this.now = store.getNow(); this.oldestUnexpiredTS = now - ttl; this.minVersions = minVersions; diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index e10e2be..851fe1c 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -296,6 +296,7 @@ module Hbase htd.setReadOnly(JBoolean.valueOf(arg.delete(READONLY))) if arg[READONLY] htd.setCompactionEnabled(JBoolean.valueOf(arg[COMPACTION_ENABLED])) if arg[COMPACTION_ENABLED] htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(MEMSTORE_FLUSHSIZE))) if arg[MEMSTORE_FLUSHSIZE] + htd.setTimestampResolution(JInteger.valueOf(arg.delete(TIMESTAMP_RESOLUTION))) if arg[TIMESTAMP_RESOLUTION] # DEFERRED_LOG_FLUSH is deprecated and was replaced by DURABILITY. To keep backward compatible, it still exists. # However, it has to be set before DURABILITY so that DURABILITY could overwrite if both args are set if arg.include?(DEFERRED_LOG_FLUSH)