diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java new file mode 100644 index 0000000..4033744 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.commons.lang.time.FastDateFormat; + +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +/** + * {@link TimestampType} is an enum to represent different ways of encoding time in HBase using 64 bits. + * Time is usually encoded as a 64-bit long in {@link org.apache.hadoop.hbase.Cell} timestamps and is used for sorting + * {@link org.apache.hadoop.hbase.Cell}s, ordering writes etc. It has methods which help in constructing or interpreting + * the 64 bit timestamp and getter methods to read the hard coded constants of the particular {@link TimestampType}. + * + *

+ * Enum {@link TimestampType} is dumb in a way. It doesn't have any logic other than interpreting the 64 bits. + * Any monotonically increasing or monotonically non-decreasing semantics of the timestamps are the responsibility of the + * clock implementation generating the particular timestamps. There can be several clock implementations, + * and each such implementation can map its representation of the timestamp to one of the available Timestamp types i.e. + * {@link #HYBRID} or {@link #PHYSICAL}. In essence, the {@link TimestampType} is only used internally by the + * Clock implementations and thus never exposed to the user. The user has to know only the different available clock + * types. So, for the user timestamp types do not exist. + *

+ */ + +enum TimestampType { + /** + * Hybrid is a Timestamp type used to encode both physical time and logical time components into a single + * 64 bit lon integer. It has methods to decipher the 64 bit hybrid timestamp and also to construct the + * hybrid timestamp. + */ + HYBRID { + /** + * Hard coded 44-bits for physical time, with most significant bit carrying the sign i.e 0 + * as we are dealing with positive integers and the remaining 43 bits are to be interpreted as + * NTP timestamp. See HBASE-14070 + * for understanding the choice of going with the millisecond resolution for physical time. Thus allowing + * us to represent all the dates between unix epoch (1970) and year 2248 with signed timestamp comparison + * with 44 bits for physical time assuming a millisecond resolution with signed long integers. + * Picking 42 bits to represent the physical time has the problem of representing time until 2039 only, + * with signed integers, might cause Y2k39 bug hoping HBase to be around till then. The trade-off here is + * with the year until we can represent the physical time vs if we are able capture all the events in the worst + * case(read: leap seconds etc) without the logical component of the timestamp overflowing. With 20 bits + * for logical time, one can represent upto one million events at the same millisecond. In case of leap + * seconds, the no of events happening in the same second is very unlikely to exceed one million. + */ + @SuppressWarnings("unused") + private static final int BITS_FOR_PHYSICAL_TIME = 44; + + /** + * Remaining 20-bits for logical time, allowing values up to 1,048,576. Logical Time is the least + * significant part of the 64 bit timestamp, so unsigned comparison can be used for LT. + */ + + private static final int BITS_FOR_LOGICAL_TIME = 20; + + /** + * Max value for physical time in the {@link #HYBRID} timestamp representation, inclusive. This assumes signed comparison. + */ + private static final long PHYSICAL_TIME_MAX_VALUE = 0x7ffffffffffL; + + /** + * Max value for logical time in the {@link #HYBRID} timestamp representation + */ + static final long LOGICAL_TIME_MAX_VALUE = 0xfffffL; + + public long toEpochTimeMillisFromTimestamp(long timestamp){ + return getPhysicalTime(timestamp); + } + + long fromEpochTimeMillisToTimestamp(long timestamp){ + return toTimestamp(TimeUnit.MILLISECONDS, timestamp, 0); + } + + long toTimestamp(TimeUnit timeUnit, long physicalTime, long logicalTime){ + physicalTime = TimeUnit.MILLISECONDS.convert(physicalTime, timeUnit); + return (physicalTime << BITS_FOR_LOGICAL_TIME) + logicalTime; + } + + long getPhysicalTime(long timestamp){ + return timestamp >>> BITS_FOR_LOGICAL_TIME; // assume unsigned timestamp + } + + long getLogicalTime(long timestamp){ + return timestamp & LOGICAL_TIME_MAX_VALUE; + } + + long getMaxPhysicalTime(){ + return PHYSICAL_TIME_MAX_VALUE; + } + + long getMaxLogicalTime(){ + return LOGICAL_TIME_MAX_VALUE; + } + + int getBitsForLogicalTime() { + return BITS_FOR_LOGICAL_TIME; + } + + /** + * Returns whether the given timestamp is "likely" of {@link #HYBRID} {@link TimestampType}. Timestamp + * implementations can use the full range of 64bit longs to represent physical and logical + * components of time. However, this method returns whether the given timestamp is a likely + * representation depending on heuristics for the clock implementation. + * + * Hybrid timestamps are checked whether they belong to Hybrid range assuming + * that Hybrid timestamps will only have > 0 logical time component for timestamps corresponding to years after + * 2016. This method will return false if lt > 0 and year is before 2016. + * Due to left shifting for Hybrid time, all millisecond-since-epoch timestamps from years 1970-10K fall into + * year 1970 when interpreted as Hybrid timestamps. Thus, {@link #isLikelyOfType(long)} will return false for + * timestamps which are in the year 1970 and logical time = 0 when interpreted as of type Hybrid Time. + * + *

+ * Note that this method uses heuristics which may not hold + * if system timestamps are intermixed from client side and server side or timestamp + * sources other than system clock are used. + *

+ * @param timestamp {@link #HYBRID} Timestamp + * @return true if the timestamp is likely to be of the corresponding {@link TimestampType} else false + */ + boolean isLikelyOfType(long timestamp){ + long physicalTime = getPhysicalTime(timestamp); + long logicalTime = getLogicalTime(timestamp); + + // heuristic 1: Up until year 2016 (1451635200000), lt component cannot be non-zero. + if (physicalTime < 1451635200000L && logicalTime != 0) { + return false; + } else if (physicalTime < 31536000000L) { + // heuristic 2: Even if lt = 0, pt should be before year 1971 (31536000000L). + // Due to left shifting by 22, all epoch ms timestamps from SYSTEM_MONOTONIC timestamp + // end up in year 1970, even for epoch time for the year 10000. + // this assumes Hybrid time is not used to represent timestamps for year 1970 UTC. + return false; + } + return true; + } + + boolean isLikelyOfType(long timestamp, boolean isClockMonotonic){ + return isLikelyOfType(timestamp); + } + + /** + * Returns a string representation for Physical Time and Logical Time components. The format is: + * yyyy-MM-dd HH:mm:ss:SSS(Physical Time),Logical Time + * Physical Time is converted to UTC time and not to local time for uniformity. + * Example: 2015-07-17 16:56:35:891(1437177395891), 0 + * @param timestamp A {@link #HYBRID} Timestamp + * @return A date time string formatted as mentioned in the method description + */ + String toString(long timestamp){ + long physicalTime = getPhysicalTime(timestamp); + long logicalTime = getLogicalTime(timestamp); + return new StringBuilder() + .append(dateFormat.format(physicalTime)) + .append("(").append(physicalTime).append(")") + .append(", ").append(logicalTime) + .toString(); + } + }, + + /** + * Physical is a Timestamp type used to encode the physical time in 64 bits. + * It has helper methods to decipher the 64 bit encoding of physical time. + */ + PHYSICAL { + long toEpochTimeMillisFromTimestamp(long timestamp) { + return timestamp; + } + + long fromEpochTimeMillisToTimestamp(long timestamp) { + return timestamp; + } + + long toTimestamp(TimeUnit timeUnit, long physicalTime, long logicalTime) { + return TimeUnit.MILLISECONDS.convert(physicalTime, timeUnit); + } + + long getPhysicalTime(long timestamp) { + return timestamp; + } + + long getLogicalTime(long timestamp) { + return 0; + } + + long getMaxPhysicalTime(){ + return Long.MAX_VALUE; + } + + long getMaxLogicalTime(){ + return 0; + } + + int getBitsForLogicalTime() { + return 0; + } + + /** + * + * @param timestamp epoch time in milliseconds + * @return True if the timestamp generated by the clock(Assumed to be Monotonic) is of type {@link #PHYSICAL} else False + */ + boolean isLikelyOfType(long timestamp) { + return isLikelyOfType(timestamp, true); + } + + boolean isLikelyOfType(long timestamp, boolean isClockMonotonic) { + // heuristic: the timestamp should be up to year 3K (32503680000000L). + if(!isClockMonotonic) { + return true; + } + return timestamp < 32503680000000L; + } + + /** + * Returns a string representation for Physical Time and Logical Time components. The format is: + * yyyy-MM-dd HH:mm:ss:SSS(Physical Time) + * Physical Time is converted to UTC time and not to local time for uniformity. + * Example: 2015-07-17 16:56:35:891(1437177395891), 0 + * @param timestamp epoch time in milliseconds + * @return A date time string formatted as mentioned in the method description + */ + String toString(long timestamp) { + long physicalTime = timestamp; + return new StringBuilder() + .append(dateFormat.format(physicalTime)) + .append("(").append(physicalTime).append(")") + .append(", ").append("0") + .toString(); + } + }; + + /** + * This is used internally by the enum methods of Hybrid and Physical Timestamp types to convert the + * timestamp to the format set here. UTC timezone instead of local time zone for convenience and uniformity + */ + private static final FastDateFormat dateFormat + = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS", TimeZone.getTimeZone("UTC")); + + + /** + * Converts the given timestamp to the unix epoch timestamp with millisecond resolution. + * Returned timestamp is compatible with System.currentTimeMillis(). + * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp + * @return number of milliseconds from epoch + */ + abstract long toEpochTimeMillisFromTimestamp(long timestamp); + + /** + * Converts the given time in milliseconds to the corresponding {@link TimestampType} representation. + * @param timeInMillis epoch time in {@link TimeUnit#MILLISECONDS} + * @return a timestamp representation corresponding to {@link TimestampType}. + */ + abstract long fromEpochTimeMillisToTimestamp(long timeInMillis); + + /** + * Converts the given physical clock in the given {@link TimeUnit} to a 64-bit timestamp + * @param timeUnit a time unit as in the enum {@link TimeUnit} + * @param physicalTime physical time + * @param logicalTime logical time + * @return a timestamp in 64 bits + */ + abstract long toTimestamp(TimeUnit timeUnit, long physicalTime, long logicalTime); + + /** + * Extracts and returns the physical time from the timestamp + * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp + * @return physical time in {@link TimeUnit#MILLISECONDS} + */ + abstract long getPhysicalTime(long timestamp); + + /** + * Extracts and returns the logical time from the timestamp + * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp + * @return logical time + */ + abstract long getLogicalTime(long timestamp); + + /** + * @return the maximum possible physical time in {@link TimeUnit#MILLISECONDS} + */ + abstract long getMaxPhysicalTime(); + + /** + * @return the maximum possible logical time + */ + abstract long getMaxLogicalTime(); + /** + * @return number of least significant bits allocated for logical time + */ + abstract int getBitsForLogicalTime(); + + /** + * @param timestamp epoch time in milliseconds + * @param isClockMonotonic if the clock that generated this timestamp is monotonic + * @return True if the timestamp generated by the clock is of type {@link #PHYSICAL} else False + */ + abstract boolean isLikelyOfType(long timestamp, boolean isClockMonotonic); + + abstract boolean isLikelyOfType(long timestamp); + + abstract String toString(long timestamp); + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java new file mode 100644 index 0000000..4728581 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +@Category(SmallTests.class) +public class TestTimestampType { + + private static long testPhysicalTime = 1234567890123L; + private static long testLogicalTime = 12; + + /* + * Tests for TimestampType enum + */ + + @Test + public void testFromToEpoch() { + for (TimestampType timestamp : TimestampType.values()) { + long wallTime = System.currentTimeMillis(); + long converted = timestamp.toEpochTimeMillisFromTimestamp( + timestamp.fromEpochTimeMillisToTimestamp(wallTime)); + + assertEquals(wallTime, converted); + } + } + + /* Tests for HL Clock */ + @Test + public void testHybridMaxValues() { + // assert 44-bit Physical Time with signed comparison (actual 43 bits) + assertEquals( + (1L << (63-TimestampType.HYBRID.getBitsForLogicalTime())) - 1, + TimestampType.HYBRID.getMaxPhysicalTime()); + + // assert 20-bit Logical Time + assertEquals( + (1L << TimestampType.HYBRID.getBitsForLogicalTime()) - 1, + TimestampType.HYBRID.getMaxLogicalTime()); + + // assert that maximum representable timestamp is Long.MAX_VALUE (assuming signed comparison). + assertEquals( + Long.MAX_VALUE, + TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, + TimestampType.HYBRID.getMaxPhysicalTime(), + TimestampType.HYBRID.getMaxLogicalTime()) + ); + } + + @Test + public void testHybridGetPhysicalTime() { + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + assertEquals(testPhysicalTime, TimestampType.HYBRID.getPhysicalTime(ts)); + } + + @Test + public void testHybridGetLogicalTime() { + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + assertEquals(testLogicalTime, TimestampType.HYBRID.getLogicalTime(ts)); + } + + @Test + public void testHybridToString() { + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + + assertEquals("2009-02-13 23:31:30:123(1234567890123), 12", TimestampType.HYBRID.toString(ts)); + } + + @Test + public void testHybridToTimestamp() { + long expected = (testPhysicalTime << TimestampType.HYBRID.getBitsForLogicalTime()) + testLogicalTime; + // test millisecond + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + assertEquals(ts, expected); + + // test nanosecond + ts = TimestampType.HYBRID.toTimestamp(TimeUnit.NANOSECONDS, TimeUnit.MILLISECONDS.toNanos(testPhysicalTime), testLogicalTime); + assertEquals(ts, expected); + } + + @Test + public void testHybridIsLikelyOfType() throws ParseException { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS Z"); + + // test timestamps of Hybrid type from year 1971 to 2248 where lt = 0 + for (int year = 1971; year <= 2248; year += 1) { + Date date = dateFormat.parse(year + "-01-01 11:22:33:444 UTC"); + + // Hybrid type ts with pt = date and lt = 0 + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, date.getTime(), 0); + System.out.println(TimestampType.HYBRID.toString(ts)); + + assertTrue(TimestampType.HYBRID.isLikelyOfType(ts)); + } + + // test timestamps of Hybrid type from year 2016 to 2348 where lt > 0 + for (int year = 2016; year <= 2248; year += 1) { + Date date = dateFormat.parse(year + "-01-01 11:22:33:444 UTC"); + + // Hybrid type ts with pt = date and lt = 123 + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, date.getTime(), 123); + System.out.println(TimestampType.HYBRID.toString(ts)); + + assertTrue(TimestampType.HYBRID.isLikelyOfType(ts)); + } + + // test that timestamps from different years are not Hybrid type + for (int year = 1970; year <= 10000 ;year += 10) { + // Stardate 1970 to 10000 + Date date = dateFormat.parse(year + "-01-01 00:00:00:000 UTC"); + long ts = date.getTime(); + System.out.println(TimestampType.PHYSICAL.toString(ts)); + System.out.println(TimestampType.PHYSICAL.toString(TimestampType.HYBRID.getPhysicalTime(ts))); + + assertFalse(TimestampType.HYBRID.isLikelyOfType(ts)); + } + + // test that timestamps up to 2016 are not Hybrid even if lt = 0 + for (int year = 1970; year <= 2016; year += 1) { + Date date = dateFormat.parse(year + "-01-01 11:22:33:444 UTC"); + + // reset lt = 0 + long ts = ((date.getTime() + >> TimestampType.HYBRID.getBitsForLogicalTime()) << TimestampType.HYBRID.getBitsForLogicalTime()); + System.out.println(Long.toHexString(ts)); + + System.out.println(TimestampType.PHYSICAL.toString(ts)); + System.out.println(TimestampType.PHYSICAL.toString(TimestampType.HYBRID.getPhysicalTime(ts))); + + assertFalse(TimestampType.HYBRID.isLikelyOfType(ts)); + } + + // test that timestamps from currentTime epoch are not Hybrid type + long systemTimeNow = System.currentTimeMillis(); + System.out.println(TimestampType.PHYSICAL.toString(systemTimeNow)); + System.out.println(TimestampType.PHYSICAL.toString((TimestampType.HYBRID.getPhysicalTime(systemTimeNow)))); + assertFalse(TimestampType.HYBRID.isLikelyOfType(systemTimeNow)); + } + + + @Test + public void testPhysicalMaxValues() { + assertEquals( + (1L << 63) - 1, + TimestampType.PHYSICAL.getMaxPhysicalTime()); + + assertEquals(0, TimestampType.PHYSICAL.getMaxLogicalTime()); + } + + @Test + public void testPhysicalGetPhysicalTime() { + long ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + assertEquals(testPhysicalTime, TimestampType.PHYSICAL.getPhysicalTime(ts)); + } + + @Test + public void testPhysicalGetLogicalTime() { + long ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + assertEquals(0, TimestampType.PHYSICAL.getLogicalTime(ts)); + } + + @Test + public void testPhysicalToString() { + long ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + + assertEquals("2009-02-13 23:31:30:123(1234567890123), 0", TimestampType.PHYSICAL.toString(ts)); + } + + @Test + public void testPhysicalToTimestamp() { + // test millisecond + long ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime); + assertEquals(ts, testPhysicalTime); + + // test nanosecond + ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.NANOSECONDS, TimeUnit.MILLISECONDS.toNanos(testPhysicalTime), testLogicalTime); + assertEquals(ts, testPhysicalTime); + } + + @Test + public void testPhysicalIsLikelyOfType() throws ParseException { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS Z"); + + // test that timestamps from 1970 to 3K epoch are of Physical type + for (int year = 1970; year < 3000 ;year += 10) { + // Start date 1970 to 10000 + Date date = dateFormat.parse(year + "-01-01 00:00:00:000 UTC"); + long ts = date.getTime(); + System.out.println(TimestampType.PHYSICAL.toString(ts)); + System.out.println(TimestampType.PHYSICAL.toString(TimestampType.HYBRID.getPhysicalTime(ts))); + + assertTrue(TimestampType.PHYSICAL.isLikelyOfType(ts)); + } + + // test that timestamps from currentTime epoch are of Physical type + long systemTimeNow = System.currentTimeMillis(); + System.out.println(TimestampType.PHYSICAL.toString(systemTimeNow)); + assertTrue(TimestampType.PHYSICAL.isLikelyOfType(systemTimeNow)); + + // test timestamps of Hybrid type from year 1970 to 2248 are not of Physical type + for (int year = 1970; year <= 2248; year += 1) { + Date date = dateFormat.parse(year + "-01-01 11:22:33:444 UTC"); + + // Hybrid type ts with pt = date and lt = 0 + long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, date.getTime(), 0); + System.out.println(TimestampType.HYBRID.toString(ts)); + System.out.println(TimestampType.PHYSICAL.toString(ts)); + + assertFalse(TimestampType.PHYSICAL.isLikelyOfType(ts)); + } + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala index 0c29f50..4369a77 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala @@ -20,13 +20,17 @@ package org.apache.hadoop.hbase.spark import java.util import java.util.concurrent.ConcurrentLinkedQueue +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.HTableDescriptor +import org.apache.hadoop.hbase.HColumnDescriptor +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.CellUtil import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.spark.datasources._ import org.apache.hadoop.hbase.types._ import org.apache.hadoop.hbase.util.{Bytes, PositionedByteRange, SimplePositionedMutableByteRange} -import org.apache.hadoop.hbase._ import org.apache.hadoop.mapred.JobConf import org.apache.spark.Logging import org.apache.spark.rdd.RDD