diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index ccad414..4eb845e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -92,6 +92,12 @@ public class HTableDescriptor implements Comparable { new Bytes(Bytes.toBytes(OWNER)); /** + * Clock type for the table which defines the semantics of the timestamps used by the table. + */ + public static final String CLOCK_TYPE = "CLOCK_TYPE"; + private static final Bytes CLOCK_TYPE_KEY = new Bytes(Bytes.toBytes(CLOCK_TYPE)); + + /** * INTERNAL Used by rest interface to access this metadata * attribute which denotes if the table is Read Only * @@ -227,6 +233,17 @@ public class HTableDescriptor implements Comparable { public static final boolean DEFAULT_NORMALIZATION_ENABLED = false; /** + * We shall default the tables to System clock until the HLC is completely introduced. + */ + public static final ClockType DEFAULT_CLOCK_TYPE = ClockType.SYSTEM; + + /** + * If clock type is not specified in the HTableDescription, than it means that it is a table + * before Clock type field is introduced. We default to System type for these tables. + */ + public static final ClockType DEFAULT_UNSET_CLOCK_TYPE = ClockType.SYSTEM; + + /** * Constant that denotes the maximum default size of the memstore after which * the contents are flushed to the store files */ @@ -802,6 +819,32 @@ public class HTableDescriptor implements Comparable { } /** + * Sets the Clock type for this table. + * + * @param clockType the timestamp implementation + * @return object of type {@link HTableDescriptor} + * @see ClockType + */ + public HTableDescriptor setClockType(ClockType clockType) { + setValue(CLOCK_TYPE, clockType.name()); + return this; + } + + /** + * Returns the timestamp type for this table. + * + * @return a Timestamp implementation. + * @see ClockType + */ + public ClockType getClockType() { + String name = getValue(CLOCK_TYPE); + if (name == null || name.isEmpty()) { + return DEFAULT_UNSET_CLOCK_TYPE; + } + return ClockType.valueOf(name); + } + + /** * Returns the size of the memstore after which a flush to filesystem is triggered. * * @return memory cache flush size for each hregion, -1 if not set. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java new file mode 100644 index 0000000..07b8b60 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.exceptions.HBaseException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.hbase.util.AtomicUtils.updateMax; + +/** + * A clock is an implementation of an algorithm to get timestamps corresponding to one of the + * {@link TimestampType}s for the current time. Different clock implementations can have + * different semantics associated with them. Every such clock should be able to map its + * representation of time to one of the {link TimestampType}s. + * HBase has traditionally been using the {@link java.lang.System#currentTimeMillis()} to + * timestamp events in HBase. {@link java.lang.System#currentTimeMillis()} does not give any + * guarantees about monotonicity of time. We will keep this implementation of clock in place for + * backward compatibility and call it SYSTEM clock. + * It is easy to provide monotonically non decreasing time semantics by keeping track of the last + * timestamp given by the clock and updating it on receipt of external message. This + * implementation of clock is called SYSTEM_MONOTONIC. + * SYSTEM Clock and SYSTEM_MONOTONIC clock as described above, both being physical clocks, they + * cannot track causality. Hybrid Logical Clocks(HLC), as described in + * HLC Paper, helps tracking + * causality using a + * Logical + * Clock but always keeps the logical time close to the wall time or physical time. It kind + * of has the advantages of both the worlds. One such advantage being getting consistent + * snapshots in physical time as described in the paper. Hybrid Logical Clock has an additional + * advantage that it is always monotonically increasing. + * Note: It is assumed that any physical clock implementation has millisecond resolution else the + * {@link TimestampType} implementation has to changed to accommodate it. It is decided after + * careful discussion to go with millisecond resolution in the HLC design document attached in the + * issue HBASE-14070 . + */ +@InterfaceStability.Evolving public abstract class Clock { + private static final Log LOG = LogFactory.getLog(Clock.class); + + protected PhysicalClock physicalClock; + protected TimestampType timestampType; + public ClockType clockType; + + Clock(PhysicalClock physicalClock) { + this.physicalClock = physicalClock; + } + + /** + * Indicates that Physical Time or Logical Time component has overflowed. This extends + * RuntimeException. + */ + @SuppressWarnings("serial") public static class ClockException extends RuntimeException { + public ClockException(String msg) { + super(msg); + } + } + + /** + * Indicates that clock update cannot be performed because the clock skew is greater than allowed. + */ + @SuppressWarnings("serial") public static class MaxClockSkewException extends HBaseException { + public MaxClockSkewException(String msg) { + super(msg); + } + } + + /** + * This is a method to get the current time. + * + * @return Timestamp of current time in 64 bit representation corresponding to the particular + * clock + */ + public abstract long now() throws RuntimeException, HBaseException; + + /** + * @return true if the clock implementation gives monotonically non decreasing timestamps else + * false. + */ + public abstract boolean isMonotonic(); + + /** + * @return true if the clock implementation gives monotonically increasing timestamps else false. + */ + public abstract boolean isMonotonicallyIncreasing(); + + interface Monotonic { + // This is currently equal to the HBase default. + long DEFAULT_MAX_CLOCK_SKEW = 30000; + + /** + * This is a method to update the local clock on receipt of a timestamped message from + * the external world. + * + * @param timestamp The timestamp present in the message received by the node from outside. + */ + long update(long timestamp) throws RuntimeException, HBaseException; + } + + public interface PhysicalClock { + /** + * This is a method to get the current time. + * + * @return Timestamp of current time in 64 bit representation corresponding to the particular + * clock + */ + long now() throws RuntimeException; + + /** + * This is a method to get the unit of the physical time used by the clock + * + * @return A {@link TimeUnit} + */ + TimeUnit getTimeUnit(); + } + + public static class JavaMillisPhysicalClock implements PhysicalClock { + @Override public long now() { + return EnvironmentEdgeManager.currentTime(); + } + + @Override public TimeUnit getTimeUnit() { + return TimeUnit.MILLISECONDS; + } + } + + /** + * Returns the default physical clock used in HBase. It is currently based on + * {@link java.lang.System#currentTimeMillis()} + * + * @return the default PhysicalClock + */ + public static PhysicalClock getDefaultPhysicalClock() { + return new JavaMillisPhysicalClock(); + } + + /** + * System clock is an implementation of clock which doesn't give any monotonic guarantees. + */ + public static class System extends Clock implements PhysicalClock { + + public System() { + super(getDefaultPhysicalClock()); + this.timestampType = TimestampType.PHYSICAL; + this.clockType = ClockType.SYSTEM; + } + + @Override public long now() { + return physicalClock.now(); + } + + @Override public boolean isMonotonic() { + return false; + } + + @Override public boolean isMonotonicallyIncreasing() { + return false; + } + + public TimeUnit getTimeUnit() { + return physicalClock.getTimeUnit(); + } + } + + /** + * System clock is an implementation of clock which guarantees monotonically non-decreasing + * timestamps. + */ + public static class SystemMonotonic extends Clock implements Monotonic, PhysicalClock { + private long maxClockSkew; + private static final long OFFSET = 5000; + AtomicLong physicalTime = new AtomicLong(); + + public SystemMonotonic(PhysicalClock physicalClock, long maxClockSkew) { + super(physicalClock); + this.maxClockSkew = maxClockSkew > 0 ? maxClockSkew : DEFAULT_MAX_CLOCK_SKEW; + this.timestampType = TimestampType.PHYSICAL; + this.clockType = ClockType.SYSTEM_MONOTONIC; + } + + public SystemMonotonic() { + super(getDefaultPhysicalClock()); + this.maxClockSkew = DEFAULT_MAX_CLOCK_SKEW; + this.timestampType = TimestampType.PHYSICAL; + this.clockType = ClockType.SYSTEM_MONOTONIC; + } + + @Override public long now() { + long systemTime = physicalClock.now(); + updateMax(physicalTime, systemTime); + return physicalTime.get(); + } + + public long update(long messageTimestamp) throws MaxClockSkewException { + long systemTime = physicalClock.now(); + if (maxClockSkew > 0 && (messageTimestamp - systemTime) > maxClockSkew) { + throw new MaxClockSkewException( + "Received event with timestamp:" + timestampType.toString(messageTimestamp) + + " which is greater than allowed clock skew "); + } + long physicalTime_ = systemTime > messageTimestamp ? systemTime : messageTimestamp; + updateMax(physicalTime, physicalTime_); + return physicalTime.get(); + } + + @Override public boolean isMonotonic() { + return true; + } + + @Override public boolean isMonotonicallyIncreasing() { + return false; + } + + public TimeUnit getTimeUnit() { + return physicalClock.getTimeUnit(); + } + + @VisibleForTesting void setPhysicalTime(long time) { + physicalTime.set(time); + } + } + + public static class HLC extends Clock implements Monotonic, PhysicalClock { + private long maxClockSkew; + private long physicalTime; + private long logicalTime; + private long maxPhysicalTime; + private long maxLogicalTime; + + public HLC(PhysicalClock physicalClock, long maxClockSkew) { + super(physicalClock); + this.maxClockSkew = maxClockSkew > 0 ? maxClockSkew : DEFAULT_MAX_CLOCK_SKEW; + this.maxPhysicalTime = timestampType.getMaxPhysicalTime(); + this.maxLogicalTime = timestampType.getMaxLogicalTime(); + this.timestampType = TimestampType.HYBRID; + this.physicalTime = 0; + this.logicalTime = 0; + this.clockType = ClockType.HLC; + } + + public HLC() { + super(getDefaultPhysicalClock()); + this.maxClockSkew = DEFAULT_MAX_CLOCK_SKEW; + this.maxPhysicalTime = timestampType.HYBRID.getMaxPhysicalTime(); + this.maxLogicalTime = timestampType.HYBRID.getMaxLogicalTime(); + this.timestampType = TimestampType.HYBRID; + this.physicalTime = 0; + this.logicalTime = 0; + this.clockType = ClockType.HLC; + } + + @Override public synchronized long now() throws ClockException { + long systemTime = physicalClock.now(); + long physicalTime_ = physicalTime; + if (systemTime >= maxPhysicalTime) { + // Extremely unlikely to happen, if this happens upper layers may have to kill the server. + throw new ClockException( + "PT overflowed: " + systemTime + " and max physical time:" + maxPhysicalTime); + } + + if (systemTime > physicalTime_) physicalTime = systemTime; + + if (physicalTime == physicalTime_) { + logicalTime++; + } else { + logicalTime = 0; + } + if (logicalTime >= maxLogicalTime) { + // highly unlikely to happen, when it happens, we throw exception for the above layer to + // handle. + throw new ClockException( + "Logical Time Overflowed: " + logicalTime + "max " + "logical " + "time:" + + maxLogicalTime); + } + return toTimestamp(); + } + + /** + * Updates {@link HLC} with the given timestamp received from elsewhere (possibly + * some other node). Returned timestamp is strict greater than msgTimestamp and local + * timestamp. + * + * @param messageTimestamp timestamp from the external message. + * @return a hybrid timestamp of HLC that is strictly greater than local timestamp and + * msgTimestamp + * @throws ClockException + * @throws MaxClockSkewException + */ + @Override public synchronized long update(long messageTimestamp) + throws ClockException, MaxClockSkewException { + long messagePhysicalTime = timestampType.getPhysicalTime(messageTimestamp); + long messageLogicalTime = timestampType.getLogicalTime(messageTimestamp); + // variable to keep old physical time when we update it. + long physicalTime_ = physicalTime; + long systemTime = physicalClock.now(); + + physicalTime = Math.max(Math.max(physicalTime_, messagePhysicalTime), systemTime); + + if (physicalTime_ >= maxPhysicalTime) { + // Extremely unlikely to happen, if this happens upper layers may have to kill the server. + throw new ClockException( + "Physical Time overflowed: " + systemTime + " and max physical time:" + + maxPhysicalTime); + } else if (messagePhysicalTime - physicalTime_ > maxClockSkew) { + throw new MaxClockSkewException( + "Received event with timestamp:" + timestampType.toString(messageTimestamp) + + " which is greater than allowed clock skew "); + } else if (physicalTime == physicalTime_ && physicalTime_ == messagePhysicalTime) { + logicalTime = Math.max(logicalTime, messageLogicalTime) + 1; + } else if (physicalTime == messagePhysicalTime) { + logicalTime = messageLogicalTime + 1; + } else if (physicalTime == physicalTime_) { + logicalTime++; + } else { + logicalTime = 0; + } + + if (logicalTime >= maxLogicalTime) { + // highly unlikely to happen, when it happens, we throw exception for the above layer to + // handle it the way they wish to. + throw new ClockException( + "Logical Time Overflowed: " + logicalTime + "max " + "logical time: " + maxLogicalTime); + } + return toTimestamp(); + } + + @Override public boolean isMonotonic() { + return true; + } + + @Override public boolean isMonotonicallyIncreasing() { + return true; + } + + public TimeUnit getTimeUnit() { + return physicalClock.getTimeUnit(); + } + + public long toTimestamp() { + return timestampType.toTimestamp(getTimeUnit(), physicalTime, logicalTime); + } + + @VisibleForTesting synchronized void setLogicalTime(long logicalTime) { + this.logicalTime = logicalTime; + } + + @VisibleForTesting synchronized void setPhysicalTime(long physicalTime) { + this.physicalTime = physicalTime; + } + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java new file mode 100644 index 0000000..b5e7922 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Public @InterfaceStability.Evolving public enum ClockType { + SYSTEM, SYSTEM_MONOTONIC, HLC +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java new file mode 100644 index 0000000..a4e6159 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.commons.lang.time.FastDateFormat; + +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +/** + * {@link TimestampType} is an enum to represent different ways of encoding time in HBase using + * 64 bits. Time is usually encoded as a 64-bit long in {@link org.apache.hadoop.hbase.Cell} + * timestamps and is used for sorting {@link org.apache.hadoop.hbase.Cell}s, ordering writes etc. + * It has methods which help in constructing or interpreting the 64 bit timestamp and getter + * methods to read the hard coded constants of the particular {@link TimestampType}. + * + *

+ * Enum {@link TimestampType} is dumb in a way. It doesn't have any logic other than interpreting + * the 64 bits. Any monotonically increasing or monotonically non-decreasing semantics of the + * timestamps are the responsibility of the clock implementation generating the particular + * timestamps. There can be several clock implementations, and each such implementation can map + * its representation of the timestamp to one of the available Timestamp types i.e. + * {@link #HYBRID} or {@link #PHYSICAL}. In essence, the {@link TimestampType} is only used + * internally by the Clock implementations and thus never exposed to the user. The user has to + * know only the different available clock types. So, for the user timestamp types do not exist. + *

+ */ + +enum TimestampType { + /** + * Hybrid is a Timestamp type used to encode both physical time and logical time components + * into a single. 64 bits long integer. It has methods to decipher the 64 bits hybrid timestamp + * and also to construct the hybrid timestamp. + */ + HYBRID { + /** + * Hard coded 44-bits for physical time, with most significant bit carrying the sign i.e 0 + * as we are dealing with positive integers and the remaining 43 bits are to be interpreted as + * system time in milli seconds. See + * HBASE-14070 for + * understanding the choice of going with the millisecond resolution for physical time. + * Thus allowing us to represent all the dates between unix epoch (1970) and year 2248 with + * signed timestamp comparison with 44 bits for physical time assuming a millisecond + * resolution with signed long integers. Picking 42 bits to represent the physical time has + * the problem of representing time until 2039 only, with signed integers, might cause Y2k39 + * bug hoping HBase to be around till then. The trade-off here is with the year until we can + * represent the physical time vs if we are able capture all the events in the worst case + * (read: leap seconds etc) without the logical component of the timestamp overflowing. With + * 20 bits for logical time, one can represent upto one million events at the same + * millisecond. In case of leap seconds, the no of events happening in the same second is very + * unlikely to exceed one million. + */ + @SuppressWarnings("unused") private static final int BITS_FOR_PHYSICAL_TIME = 44; + + /** + * Remaining 20-bits for logical time, allowing values up to 1,048,576. Logical Time is the + * least significant part of the 64 bit timestamp, so unsigned comparison can be used for LT. + */ + + private static final int BITS_FOR_LOGICAL_TIME = 20; + + /** + * Max value for physical time in the {@link #HYBRID} timestamp representation, inclusive. + * This assumes signed comparison. + */ + private static final long PHYSICAL_TIME_MAX_VALUE = 0x7ffffffffffL; + + /** + * Max value for logical time in the {@link #HYBRID} timestamp representation + */ + static final long LOGICAL_TIME_MAX_VALUE = 0xfffffL; + + public long toEpochTimeMillisFromTimestamp(long timestamp) { + return getPhysicalTime(timestamp); + } + + long fromEpochTimeMillisToTimestamp(long timestamp) { + return toTimestamp(TimeUnit.MILLISECONDS, timestamp, 0); + } + + long toTimestamp(TimeUnit timeUnit, long physicalTime, long logicalTime) { + physicalTime = TimeUnit.MILLISECONDS.convert(physicalTime, timeUnit); + return (physicalTime << BITS_FOR_LOGICAL_TIME) + logicalTime; + } + + long getPhysicalTime(long timestamp) { + return timestamp >>> BITS_FOR_LOGICAL_TIME; // assume unsigned timestamp + } + + long getLogicalTime(long timestamp) { + return timestamp & LOGICAL_TIME_MAX_VALUE; + } + + long getMaxPhysicalTime() { + return PHYSICAL_TIME_MAX_VALUE; + } + + long getMaxLogicalTime() { + return LOGICAL_TIME_MAX_VALUE; + } + + int getBitsForLogicalTime() { + return BITS_FOR_LOGICAL_TIME; + } + + /** + * Returns whether the given timestamp is "likely" of {@link #HYBRID} {@link TimestampType}. + * Timestamp implementations can use the full range of 64bits long to represent physical and + * logical components of time. However, this method returns whether the given timestamp is a + * likely representation depending on heuristics for the clock implementation. + * + * Hybrid timestamps are checked whether they belong to Hybrid range assuming + * that Hybrid timestamps will only have > 0 logical time component for timestamps + * corresponding to years after 2016. This method will return false if lt > 0 and year is + * before 2016. Due to left shifting for Hybrid time, all millisecond-since-epoch timestamps + * from years 1970-10K fall into + * year 1970 when interpreted as Hybrid timestamps. Thus, {@link #isLikelyOfType(long, boolean)} will + * return false for timestamps which are in the year 1970 and logical time = 0 when + * interpreted as of type Hybrid Time. + * + *

+ * Note that this method uses heuristics which may not hold + * if system timestamps are intermixed from client side and server side or timestamp + * sources other than system clock are used. + *

+ * @param timestamp {@link #HYBRID} Timestamp + * @param isClockMonotonic if the clock that generated this timestamp is monotonic + * @return true if the timestamp is likely to be of the corresponding {@link TimestampType} + * else false + */ + boolean isLikelyOfType(long timestamp, boolean isClockMonotonic) { + long physicalTime = getPhysicalTime(timestamp); + long logicalTime = getLogicalTime(timestamp); + + // heuristic 1: Up until year 2016 (1451635200000), lt component cannot be non-zero. + if (physicalTime < 1451635200000L && logicalTime != 0) { + return false; + } else if (physicalTime < 31536000000L) { + // heuristic 2: Even if logical time = 0, physical time after left shifting by 20 bits, + // will be before year 1971(31536000000L), as after left shifting by 20, all epoch ms + // timestamps from wall time end up in year less than 1971, even for epoch time for the + // year 10000. This assumes Hybrid time is not used to represent timestamps for year 1970 + // UTC. + return false; + } + return true; + } + + /** + * Returns a string representation for Physical Time and Logical Time components. The format is: + * yyyy-MM-dd HH:mm:ss:SSS(Physical Time),Logical Time + * Physical Time is converted to UTC time and not to local time for uniformity. + * Example: 2015-07-17 16:56:35:891(1437177395891), 0 + * @param timestamp A {@link #HYBRID} Timestamp + * @return A date time string formatted as mentioned in the method description + */ + String toString(long timestamp) { + long physicalTime = getPhysicalTime(timestamp); + long logicalTime = getLogicalTime(timestamp); + return new StringBuilder().append(dateFormat.format(physicalTime)).append("(") + .append(physicalTime).append(")").append(", ").append(logicalTime).toString(); + } + }, + + /** + * Physical is a Timestamp type used to encode the physical time in 64 bits. + * It has helper methods to decipher the 64 bit encoding of physical time. + */ + PHYSICAL { + long toEpochTimeMillisFromTimestamp(long timestamp) { + return timestamp; + } + + long fromEpochTimeMillisToTimestamp(long timestamp) { + return timestamp; + } + + long toTimestamp(TimeUnit timeUnit, long physicalTime, long logicalTime) { + return TimeUnit.MILLISECONDS.convert(physicalTime, timeUnit); + } + + long getPhysicalTime(long timestamp) { + return timestamp; + } + + long getLogicalTime(long timestamp) { + return 0; + } + + long getMaxPhysicalTime() { + return Long.MAX_VALUE; + } + + long getMaxLogicalTime() { + return 0; + } + + int getBitsForLogicalTime() { + return 0; + } + + boolean isLikelyOfType(long timestamp, boolean isClockMonotonic) { + // heuristic: the timestamp should be up to year 3K (32503680000000L). + if (!isClockMonotonic) { + return true; + } + return timestamp < 32503680000000L; + } + + /** + * Returns a string representation for Physical Time and Logical Time components. The format is: + * yyyy-MM-dd HH:mm:ss:SSS(Physical Time) + * Physical Time is converted to UTC time and not to local time for uniformity. + * Example: 2015-07-17 16:56:35:891(1437177395891), 0 + * @param timestamp epoch time in milliseconds + * @return A date time string formatted as mentioned in the method description + */ + String toString(long timestamp) { + long physicalTime = timestamp; + return new StringBuilder().append(dateFormat.format(physicalTime)).append("(") + .append(physicalTime).append(")").append(", ").append("0").toString(); + } + }; + + /** + * This is used internally by the enum methods of Hybrid and Physical Timestamp types to + * convert the + * timestamp to the format set here. UTC timezone instead of local time zone for convenience + * and uniformity + */ + private static final FastDateFormat dateFormat = + FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss:SSS", TimeZone.getTimeZone("UTC")); + + /** + * Converts the given timestamp to the unix epoch timestamp with millisecond resolution. + * Returned timestamp is compatible with System.currentTimeMillis(). + * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp + * @return number of milliseconds from epoch + */ + abstract long toEpochTimeMillisFromTimestamp(long timestamp); + + /** + * Converts the given time in milliseconds to the corresponding {@link TimestampType} + * representation. + * @param timeInMillis epoch time in {@link TimeUnit#MILLISECONDS} + * @return a timestamp representation corresponding to {@link TimestampType}. + */ + abstract long fromEpochTimeMillisToTimestamp(long timeInMillis); + + /** + * Converts the given physical clock in the given {@link TimeUnit} to a 64-bit timestamp + * @param timeUnit a time unit as in the enum {@link TimeUnit} + * @param physicalTime physical time + * @param logicalTime logical time + * @return a timestamp in 64 bits + */ + abstract long toTimestamp(TimeUnit timeUnit, long physicalTime, long logicalTime); + + /** + * Extracts and returns the physical time from the timestamp + * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp + * @return physical time in {@link TimeUnit#MILLISECONDS} + */ + abstract long getPhysicalTime(long timestamp); + + /** + * Extracts and returns the logical time from the timestamp + * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp + * @return logical time + */ + abstract long getLogicalTime(long timestamp); + + /** + * @return the maximum possible physical time in {@link TimeUnit#MILLISECONDS} + */ + abstract long getMaxPhysicalTime(); + + /** + * @return the maximum possible logical time + */ + abstract long getMaxLogicalTime(); + + /** + * @return number of least significant bits allocated for logical time + */ + abstract int getBitsForLogicalTime(); + + /** + * @param timestamp epoch time in milliseconds + * @param isClockMonotonic if the clock that generated this timestamp is monotonic + * @return True if the timestamp generated by the clock is of type {@link #PHYSICAL} else False + */ + abstract boolean isLikelyOfType(long timestamp, boolean isClockMonotonic); + + abstract String toString(long timestamp); + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java new file mode 100644 index 0000000..fd284fe --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestClock { + + private static final long OFFSET = 5000; + + // All Clocks Tests + + /** + * Remove this test if moving away from millis resolution for physical time. Be sure to change + * {@link TimestampType} methods which assume millisecond resolution. + */ + @Test public void TestClocksPhysicalTimeResolution() { + Clock.System systemClock = new Clock.System(); + Clock.SystemMonotonic systemMonotonicClock = new Clock.SystemMonotonic(); + Clock.HLC hybridLogicalClock = new Clock.HLC(); + assertTrue(systemClock.getTimeUnit() == systemMonotonicClock.getTimeUnit() + && systemClock.getTimeUnit() == hybridLogicalClock.getTimeUnit() + && TimeUnit.MILLISECONDS == systemClock.getTimeUnit()); + } + + // All System Clock Tests + @Test public void TestSystemClockIsMonotonic() { + Clock.System systemClock = new Clock.System(); + assertFalse(systemClock.isMonotonic()); + } + + @Test public void testSystemClockIsMonotonicallyIncreasing() { + Clock.System systemClock = new Clock.System(); + assertFalse(systemClock.isMonotonicallyIncreasing()); + } + + // All System Monotonic Clock Tests + + @Test public void testSystemMonotonicClockNow() { + assertTrue(true); + } + + @Test public void testSystemMonotonicClockUpdate() { + assertTrue(true); + } + + @Test public void testSystemMonotonicClockIsMonotonic() { + Clock.SystemMonotonic systemMonotonicClock = new Clock.SystemMonotonic(); + assertTrue(systemMonotonicClock.isMonotonic()); + } + + @Test public void testSystemMonotonicClockIsMonotonicallyIncreasing() { + Clock.SystemMonotonic systemMonotonicClock = new Clock.SystemMonotonic(); + assertFalse(systemMonotonicClock.isMonotonicallyIncreasing()); + } + + // All Hybrid Logical Clock Tests + @Test public void testHLCIsMonotonic() { + Clock.HLC hybridLogicalClock = new Clock.HLC(); + assertTrue(hybridLogicalClock.isMonotonic()); + } + + @Test public void testHLCIsMonotonicallyIncreasing() { + Clock.HLC hybridLogicalClock = new Clock.HLC(); + assertTrue(hybridLogicalClock.isMonotonicallyIncreasing()); + } + + @Test public void testSystemMonotonicNow() { + Clock.SystemMonotonic systemMonotonicClock = new Clock.SystemMonotonic(); + final long currentTime = System.currentTimeMillis(); + systemMonotonicClock.setPhysicalTime(currentTime + OFFSET); + assertTrue(systemMonotonicClock.now() == currentTime + OFFSET); + + systemMonotonicClock.setPhysicalTime(currentTime - OFFSET); + assertTrue(systemMonotonicClock.now() >= currentTime); + + } + + @Test public void testSystemMonotonicUpdate() { + Clock.SystemMonotonic systemMonotonicClock = new Clock.SystemMonotonic(); + final long currentTime = System.currentTimeMillis(); + long messageTimestamp; + + // case 1: Message Timestamp ahead of current time and last physical timestamp. + messageTimestamp = currentTime + 2*OFFSET; + systemMonotonicClock.setPhysicalTime(currentTime + OFFSET); + try { + assertTrue(systemMonotonicClock.update(messageTimestamp) == messageTimestamp); + } catch (Clock.MaxClockSkewException e) { + assertFalse(true); + } + + messageTimestamp = currentTime + 2*OFFSET; + systemMonotonicClock.setPhysicalTime(currentTime - OFFSET); + try { + assertTrue(systemMonotonicClock.update(messageTimestamp) == messageTimestamp); + } catch (Clock.MaxClockSkewException e) { + assertFalse(true); + } + + messageTimestamp = currentTime + 10*OFFSET; + systemMonotonicClock.setPhysicalTime(currentTime + OFFSET); + try { + systemMonotonicClock.update(messageTimestamp); + assertTrue(false); + } catch (Clock.MaxClockSkewException e) { + assertTrue(true); + } + + // case 2: last physical timestamp is greater than current time and message timestamp + messageTimestamp = currentTime + OFFSET; + systemMonotonicClock.setPhysicalTime(currentTime + 2*OFFSET); + try { + assertTrue(systemMonotonicClock.update(messageTimestamp) == currentTime + 2*OFFSET); + } catch (Clock.MaxClockSkewException e) { + assertFalse(true); + } + + messageTimestamp = currentTime - OFFSET; + systemMonotonicClock.setPhysicalTime(currentTime + 2*OFFSET); + try { + assertTrue(systemMonotonicClock.update(messageTimestamp) == currentTime + 2*OFFSET); + } catch (Clock.MaxClockSkewException e) { + assertFalse(true); + } + + // case 3: current time is greater than last physical timestamp and message timestamp + messageTimestamp = currentTime - OFFSET; + systemMonotonicClock.setPhysicalTime(currentTime - 2*OFFSET); + try { + assertTrue(systemMonotonicClock.update(messageTimestamp) >= currentTime); + } catch (Clock.MaxClockSkewException e) { + assertFalse(true); + } + + messageTimestamp = currentTime - 2*OFFSET; + systemMonotonicClock.setPhysicalTime(currentTime - OFFSET); + try { + assertTrue(systemMonotonicClock.update(messageTimestamp) >= currentTime); + } catch (Clock.MaxClockSkewException e) { + assertFalse(true); + } + } + + @Test public void testHLCNow() { + Clock.HLC hybridLogicalClock = new Clock.HLC(); + final long currentTime = System.currentTimeMillis(); + // case 1: Test Physical Time updates. + hybridLogicalClock.setPhysicalTime(currentTime + OFFSET); + hybridLogicalClock.setLogicalTime(1); + assertTrue(hybridLogicalClock.now() == TimestampType.HYBRID + .toTimestamp(TimeUnit.MILLISECONDS, currentTime + OFFSET, 2)); + + // case 2: current physical time is greater than last physical time component of HLC. + hybridLogicalClock.setPhysicalTime(currentTime - OFFSET); + hybridLogicalClock.setLogicalTime(1); + assertTrue(hybridLogicalClock.now() > TimestampType.HYBRID + .toTimestamp(TimeUnit.MILLISECONDS, currentTime - OFFSET, 1)); + } + + @Test public void testHLCUpdate() { + Clock.HLC hybridLogicalClock = new Clock.HLC(); + final long currentTime = System.currentTimeMillis(); + long messageTimestamp, timestamp; + /* case 1: Message timestamp is ahead of the current timestamp and the last given timestamp. + Has two sub cases: a) last physical timestamp ahead of current time. + b) current time ahead of last physical timestamp. + c) Skew Exception check. + */ + // 1a. + hybridLogicalClock.setPhysicalTime(currentTime + OFFSET); + hybridLogicalClock.setLogicalTime(1); + messageTimestamp = + TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, currentTime + 2 * OFFSET, 1); + try { + timestamp = hybridLogicalClock.update(messageTimestamp); + assertTrue(timestamp == TimestampType.HYBRID + .toTimestamp(TimeUnit.MILLISECONDS, currentTime + 2 * OFFSET, 2)); + } catch (Clock.MaxClockSkewException maxClockSkew) { + System.out.println(maxClockSkew.getMessage()); + } + //1b. + hybridLogicalClock.setPhysicalTime(currentTime - OFFSET); + hybridLogicalClock.setLogicalTime(1); + messageTimestamp = + TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, currentTime + 2 * OFFSET, 1); + try { + timestamp = hybridLogicalClock.update(messageTimestamp); + assertTrue(timestamp == TimestampType.HYBRID + .toTimestamp(TimeUnit.MILLISECONDS, currentTime + 2 * OFFSET, 2)); + } catch (Clock.MaxClockSkewException maxClockSkew) { + System.out.println(maxClockSkew.getMessage()); + } + + //1c. + hybridLogicalClock.setPhysicalTime(currentTime - OFFSET); + hybridLogicalClock.setLogicalTime(1); + messageTimestamp = + TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, currentTime + 10 * OFFSET, 1); + try { + timestamp = hybridLogicalClock.update(messageTimestamp); + assertTrue(false); + } catch (Clock.MaxClockSkewException maxClockSkew) { + assertTrue(true); + } + + // Case 2: Last Physical timestamp is ahead of message timestamp and current time + // Has two sub cases: a) message timestamp ahead of current time. + // b) current time is ahead of message timestamp. + + // 2a. + messageTimestamp = + TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, currentTime + 2 * OFFSET, 1); + hybridLogicalClock.setPhysicalTime(currentTime + 3 * OFFSET); + hybridLogicalClock.setLogicalTime(1); + try { + timestamp = hybridLogicalClock.update(messageTimestamp); + assertTrue(timestamp == TimestampType.HYBRID + .toTimestamp(TimeUnit.MILLISECONDS, currentTime + 3 * OFFSET, 2)); + } catch (Clock.MaxClockSkewException maxClockSkew) { + System.out.println(maxClockSkew.getMessage()); + } + + //2b. + messageTimestamp = + TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, currentTime - 2 * OFFSET, 1); + hybridLogicalClock.setPhysicalTime(currentTime + 3 * OFFSET); + hybridLogicalClock.setLogicalTime(1); + try { + timestamp = hybridLogicalClock.update(messageTimestamp); + assertTrue(timestamp == TimestampType.HYBRID + .toTimestamp(TimeUnit.MILLISECONDS, currentTime + 3 * OFFSET, 2)); + } catch (Clock.MaxClockSkewException maxClockSkew) { + System.out.println(maxClockSkew.getMessage()); + } + + // Case 3: Current time is ahead of message timestamp and last physical Timestamp. + // Has two sub cases: a) last physical timestamp ahead of message timestamp. + // b) message timestamp ahead of last physical timestamp + + //3a. + messageTimestamp = + TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, currentTime - 2 * OFFSET, 1); + hybridLogicalClock.setPhysicalTime(currentTime - OFFSET); + hybridLogicalClock.setLogicalTime(10); + try { + timestamp = hybridLogicalClock.update(messageTimestamp); + assertTrue( + timestamp > TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, currentTime, 0)); + } catch (Clock.MaxClockSkewException maxClockSkew) { + System.out.println(maxClockSkew.getMessage()); + } + + //3b. + messageTimestamp = + TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, currentTime - OFFSET, 1); + hybridLogicalClock.setPhysicalTime(currentTime - 2 * OFFSET); + hybridLogicalClock.setLogicalTime(10); + try { + timestamp = hybridLogicalClock.update(messageTimestamp); + assertTrue( + timestamp > TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, currentTime, 0)); + } catch (Clock.MaxClockSkewException maxClockSkew) { + System.out.println(maxClockSkew.getMessage()); + } + + } +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java new file mode 100644 index 0000000..01c8314 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +@Category(SmallTests.class) +public class TestTimestampType { + + private static long testPhysicalTime = 1234567890123L; + private static long testLogicalTime = 12; + + /* + * Tests for TimestampType enum + */ + + @Test + public void testFromToEpoch() { + for (TimestampType timestamp : TimestampType.values()) { + long wallTime = System.currentTimeMillis(); + long converted = timestamp.toEpochTimeMillisFromTimestamp( + timestamp.fromEpochTimeMillisToTimestamp(wallTime)); + + assertEquals(wallTime, converted); + } + } + + /* Tests for HL Clock */ + @Test + public void testHybridMaxValues() { + // assert 44-bit Physical Time with signed comparison (actual 43 bits) + assertEquals( + (1L << (63-TimestampType.HYBRID.getBitsForLogicalTime())) - 1, + TimestampType.HYBRID.getMaxPhysicalTime()); + + // assert 20-bit Logical Time + assertEquals( + (1L << TimestampType.HYBRID.getBitsForLogicalTime()) - 1, + TimestampType.HYBRID.getMaxLogicalTime()); + + // assert that maximum representable timestamp is Long.MAX_VALUE (assuming signed comparison). + assertEquals( + Long.MAX_VALUE, + TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, + TimestampType.HYBRID.getMaxPhysicalTime(), + TimestampType.HYBRID.getMaxLogicalTime()) + ); + } + + @Test + public void testHybridGetPhysicalTime() { + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + assertEquals(testPhysicalTime, TimestampType.HYBRID.getPhysicalTime(ts)); + } + + @Test + public void testHybridGetLogicalTime() { + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + assertEquals(testLogicalTime, TimestampType.HYBRID.getLogicalTime(ts)); + } + + @Test + public void testHybridToString() { + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + + assertEquals("2009-02-13T23:31:30:123(1234567890123), 12", TimestampType.HYBRID.toString(ts)); + } + + @Test + public void testHybridToTimestamp() { + long expected = (testPhysicalTime << TimestampType.HYBRID.getBitsForLogicalTime()) + testLogicalTime; + // test millisecond + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + assertEquals(ts, expected); + + // test nanosecond + ts = TimestampType.HYBRID.toTimestamp(TimeUnit.NANOSECONDS, TimeUnit.MILLISECONDS.toNanos(testPhysicalTime), testLogicalTime); + assertEquals(ts, expected); + } + + @Test + public void testHybridIsLikelyOfType() throws ParseException { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss:SSS Z"); + + // test timestamps of Hybrid type from year 1971 to 2248 where lt = 0 + for (int year = 1971; year <= 2248; year += 1) { + Date date = dateFormat.parse(year + "-01-01T11:22:33:444 UTC"); + + // Hybrid type ts with pt = date and lt = 0 + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, date.getTime(), 0); + System.out.println(TimestampType.HYBRID.toString(ts)); + + assertTrue(TimestampType.HYBRID.isLikelyOfType(ts, true)); + } + + // test timestamps of Hybrid type from year 2016 to 2348 where lt > 0 + for (int year = 2016; year <= 2248; year += 1) { + Date date = dateFormat.parse(year + "-01-01T11:22:33:444 UTC"); + + // Hybrid type ts with pt = date and lt = 123 + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, date.getTime(), 123); + System.out.println(TimestampType.HYBRID.toString(ts)); + + assertTrue(TimestampType.HYBRID.isLikelyOfType(ts, true)); + } + + // test that timestamps from different years are not Hybrid type + for (int year = 1970; year <= 10000 ;year += 10) { + // Stardate 1970 to 10000 + Date date = dateFormat.parse(year + "-01-01T00:00:00:000 UTC"); + long ts = date.getTime(); + System.out.println(TimestampType.PHYSICAL.toString(ts)); + System.out.println(TimestampType.PHYSICAL.toString(TimestampType.HYBRID.getPhysicalTime(ts))); + + assertFalse(TimestampType.HYBRID.isLikelyOfType(ts, true)); + } + + // test that timestamps up to 2016 are not Hybrid even if lt = 0 + for (int year = 1970; year <= 2016; year += 1) { + Date date = dateFormat.parse(year + "-01-01T11:22:33:444 UTC"); + + // reset lt = 0 + long ts = ((date.getTime() + >> TimestampType.HYBRID.getBitsForLogicalTime()) << TimestampType.HYBRID.getBitsForLogicalTime()); + System.out.println(Long.toHexString(ts)); + + System.out.println(TimestampType.PHYSICAL.toString(ts)); + System.out.println(TimestampType.PHYSICAL.toString(TimestampType.HYBRID.getPhysicalTime(ts))); + + assertFalse(TimestampType.HYBRID.isLikelyOfType(ts, true)); + } + + // test that timestamps from currentTime epoch are not Hybrid type + long systemTimeNow = System.currentTimeMillis(); + System.out.println(TimestampType.PHYSICAL.toString(systemTimeNow)); + System.out.println(TimestampType.PHYSICAL.toString((TimestampType.HYBRID.getPhysicalTime(systemTimeNow)))); + assertFalse(TimestampType.HYBRID.isLikelyOfType(systemTimeNow, true)); + } + + + @Test + public void testPhysicalMaxValues() { + assertEquals( + (1L << 63) - 1, + TimestampType.PHYSICAL.getMaxPhysicalTime()); + + assertEquals(0, TimestampType.PHYSICAL.getMaxLogicalTime()); + } + + @Test + public void testPhysicalGetPhysicalTime() { + long ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + assertEquals(testPhysicalTime, TimestampType.PHYSICAL.getPhysicalTime(ts)); + } + + @Test + public void testPhysicalGetLogicalTime() { + long ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + assertEquals(0, TimestampType.PHYSICAL.getLogicalTime(ts)); + } + + @Test + public void testPhysicalToString() { + long ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + + assertEquals("2009-02-13T23:31:30:123(1234567890123), 0", TimestampType.PHYSICAL.toString(ts)); + } + + @Test + public void testPhysicalToTimestamp() { + // test millisecond + long ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + assertEquals(ts, testPhysicalTime); + + // test nanosecond + ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.NANOSECONDS, TimeUnit.MILLISECONDS.toNanos(testPhysicalTime), testLogicalTime); + assertEquals(ts, testPhysicalTime); + } + + @Test + public void testPhysicalIsLikelyOfType() throws ParseException { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss:SSS Z"); + + // test that timestamps from 1970 to 3K epoch are of Physical type + for (int year = 1970; year < 3000 ;year += 10) { + // Start date 1970 to 10000 + Date date = dateFormat.parse(year + "-01-01T00:00:00:000 UTC"); + long ts = date.getTime(); + System.out.println(TimestampType.PHYSICAL.toString(ts)); + System.out.println(TimestampType.PHYSICAL.toString(TimestampType.HYBRID.getPhysicalTime(ts))); + + assertTrue(TimestampType.PHYSICAL.isLikelyOfType(ts, true)); + } + + // test that timestamps from currentTime epoch are of Physical type + long systemTimeNow = System.currentTimeMillis(); + System.out.println(TimestampType.PHYSICAL.toString(systemTimeNow)); + assertTrue(TimestampType.PHYSICAL.isLikelyOfType(systemTimeNow, true)); + + // test timestamps of Hybrid type from year 1970 to 2248 are not of Physical type + for (int year = 1970; year <= 2248; year += 1) { + Date date = dateFormat.parse(year + "-01-01T11:22:33:444 UTC"); + + // Hybrid type ts with pt = date and lt = 0 + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, date.getTime(), 0); + System.out.println(TimestampType.HYBRID.toString(ts)); + System.out.println(TimestampType.PHYSICAL.toString(ts)); + + assertFalse(TimestampType.PHYSICAL.isLikelyOfType(ts, true)); + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e03993f..20b81c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -78,6 +78,8 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Clock; +import org.apache.hadoop.hbase.ClockType; import org.apache.hadoop.hbase.HealthCheckChore; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NotServingRegionException; @@ -309,6 +311,10 @@ public class HRegionServer extends HasThread implements // debugging and unit tests. private volatile boolean abortRequested; + protected Clock.HLC hybridLogicalClock = new Clock.HLC(); + protected Clock.SystemMonotonic systemMonotonicClock = new Clock.SystemMonotonic(); + protected Clock.System systemClock = new Clock.System(); + ConcurrentMap rowlocks = new ConcurrentHashMap(); // A state before we go into stopped state. At this stage we're closing user @@ -1905,6 +1911,17 @@ public class HRegionServer extends HasThread implements } @Override + public Clock getRegionServerClock(ClockType clockType) { + if(clockType == ClockType.HLC){ + return hybridLogicalClock; + } else if(clockType == ClockType.SYSTEM_MONOTONIC){ + return systemMonotonicClock; + } else { + return systemClock; + } + } + + @Override public Connection getConnection() { return getClusterConnection(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 356a88b..cde287d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.Clock; +import org.apache.hadoop.hbase.ClockType; import org.apache.zookeeper.KeeperException; import com.google.protobuf.Service; @@ -55,6 +57,8 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi * default (common) WAL */ WAL getWAL(HRegionInfo regionInfo) throws IOException; + Clock getRegionServerClock(ClockType clockType); + /** * @return Implementation of {@link CompactionRequestor} or null. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 6f225d6..01eb8f6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -250,6 +250,10 @@ public class MockRegionServerServices implements RegionServerServices { return null; } + @Override public Clock getRegionServerClock(ClockType clockType) { + return null; + } + @Override public ExecutorService getExecutorService() { return null; diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala index 0c29f50..1a3c370 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala @@ -26,7 +26,11 @@ import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.spark.datasources._ import org.apache.hadoop.hbase.types._ import org.apache.hadoop.hbase.util.{Bytes, PositionedByteRange, SimplePositionedMutableByteRange} -import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.HTableDescriptor +import org.apache.hadoop.hbase.HColumnDescriptor +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.CellUtil import org.apache.hadoop.mapred.JobConf import org.apache.spark.Logging import org.apache.spark.rdd.RDD