diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseSchemaTool.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseSchemaTool.java index 79c9e08..e5833b8 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseSchemaTool.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseSchemaTool.java @@ -577,6 +577,6 @@ public void oneMondoTest() throws Exception { outStr = new ByteArrayOutputStream(); out = new PrintStream(outStr); tool.go(false, HBaseReadWrite.SEQUENCES_TABLE, null, "whatever", conf, out, err); - Assert.assertEquals("mk: 1" + lsep, outStr.toString()); + Assert.assertEquals("cv_PERMANENT_FUNCTION: 3\nmaster_key: 1" + lsep, outStr.toString()); } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index 61257f0..7ed825f 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -101,7 +100,6 @@ final static String TABLE_TABLE = "HBMS_TBLS"; final static String USER_TO_ROLE_TABLE = "HBMS_USER_TO_ROLE"; final static String FILE_METADATA_TABLE = "HBMS_FILE_METADATA"; - final static String CHANGE_VERSION_TABLE = "HBMS_CHANGE_VERSION"; final static byte[] CATALOG_CF = "c".getBytes(HBaseUtils.ENCODING); final static byte[] STATS_CF = "s".getBytes(HBaseUtils.ENCODING); final static String NO_CACHE_CONF = "no.use.cache"; @@ -111,7 +109,7 @@ public final static String[] tableNames = { AGGR_STATS_TABLE, DB_TABLE, FUNC_TABLE, GLOBAL_PRIVS_TABLE, PART_TABLE, USER_TO_ROLE_TABLE, ROLE_TABLE, SD_TABLE, SECURITY_TABLE, SEQUENCES_TABLE, - TABLE_TABLE, FILE_METADATA_TABLE, CHANGE_VERSION_TABLE }; + TABLE_TABLE, FILE_METADATA_TABLE }; public final static Map> columnFamilies = new HashMap<> (tableNames.length); static { @@ -128,20 +126,22 @@ columnFamilies.put(TABLE_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); // Stats CF will contain PPD stats. columnFamilies.put(FILE_METADATA_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); - columnFamilies.put(CHANGE_VERSION_TABLE, Arrays.asList(CATALOG_CF)); } + final static byte[] MASTER_KEY_SEQUENCE = "master_key".getBytes(HBaseUtils.ENCODING); + // The change version functionality uses the sequences table, but we don't want to give the + // caller complete control over the sequence name as they might inadvertently clash with one of + // our sequence keys, so add a prefix to their topic name. + final static String CHANGE_VERSION_SEQUENCE_PREFIX = "cv_"; + final static byte[] AGGR_STATS_BLOOM_COL = "b".getBytes(HBaseUtils.ENCODING); private final static byte[] AGGR_STATS_STATS_COL = "s".getBytes(HBaseUtils.ENCODING); - final static byte[] MASTER_KEY_SEQUENCE = "mk".getBytes(HBaseUtils.ENCODING); private final static byte[] CATALOG_COL = "c".getBytes(HBaseUtils.ENCODING); private final static byte[] ROLES_COL = "roles".getBytes(HBaseUtils.ENCODING); private final static byte[] REF_COUNT_COL = "ref".getBytes(HBaseUtils.ENCODING); private final static byte[] DELEGATION_TOKEN_COL = "dt".getBytes(HBaseUtils.ENCODING); private final static byte[] MASTER_KEY_COL = "mk".getBytes(HBaseUtils.ENCODING); private final static byte[] GLOBAL_PRIVS_KEY = "gp".getBytes(HBaseUtils.ENCODING); - private final static byte[] SEQUENCES_KEY = "seq".getBytes(HBaseUtils.ENCODING); - private final static byte[] CV_COL = "cv".getBytes(HBaseUtils.ENCODING); private final static int TABLES_TO_CACHE = 10; // False positives are very bad here because they cause us to invalidate entries we shouldn't. // Space used and # of hash functions grows in proportion to ln of num bits so a 10x increase @@ -2146,23 +2146,16 @@ private ColumnStatistics buildColStats(byte[] key, boolean fromTable) throws IOE *********************************************************************************************/ public long getChangeVersion(String topic) throws IOException { - byte[] key = HBaseUtils.buildKey(topic); - byte[] result = read(CHANGE_VERSION_TABLE, key, CATALOG_CF, CV_COL); - return (result == null) ? -1 : Long.valueOf(new String(result, HBaseUtils.ENCODING)); + byte[] key = HBaseUtils.buildKey(CHANGE_VERSION_SEQUENCE_PREFIX + topic); + return peekAtSequence(key); } // TODO: The way this is called now is not ideal. It's all encapsulated and stuff, but, // before the txns (consistent HBase writes) are properly implemented, we should at least // put this in the same RPC with real updates. But there are no guarantees anyway, so... public void incrementChangeVersion(String topic) throws IOException { - byte[] key = HBaseUtils.buildKey(topic); - byte[] serialized = read(CHANGE_VERSION_TABLE, key, CATALOG_CF, CV_COL); - long val = 0; - if (serialized != null) { - val = Long.valueOf(new String(serialized, HBaseUtils.ENCODING)); - } - store(CHANGE_VERSION_TABLE, key, CATALOG_CF, CV_COL, - new Long(val + 1).toString().getBytes(HBaseUtils.ENCODING)); + byte[] key = HBaseUtils.buildKey(CHANGE_VERSION_SEQUENCE_PREFIX + topic); + getNextSequence(key); } /********************************************************************************************** @@ -2400,14 +2393,19 @@ void deleteMasterKey(Integer seqNo) throws IOException { * Sequence methods *********************************************************************************************/ + long peekAtSequence(byte[] sequence) throws IOException { + byte[] serialized = read(SEQUENCES_TABLE, sequence, CATALOG_CF, CATALOG_COL); + return serialized == null ? 0 : Long.valueOf(new String(serialized, HBaseUtils.ENCODING)); + } + long getNextSequence(byte[] sequence) throws IOException { - byte[] serialized = read(SEQUENCES_TABLE, SEQUENCES_KEY, CATALOG_CF, sequence); + byte[] serialized = read(SEQUENCES_TABLE, sequence, CATALOG_CF, CATALOG_COL); long val = 0; if (serialized != null) { val = Long.valueOf(new String(serialized, HBaseUtils.ENCODING)); } byte[] incrSerialized = new Long(val + 1).toString().getBytes(HBaseUtils.ENCODING); - store(SEQUENCES_TABLE, SEQUENCES_KEY, CATALOG_CF, sequence, incrSerialized); + store(SEQUENCES_TABLE, sequence, CATALOG_CF, CATALOG_COL, incrSerialized); return val; } @@ -2418,16 +2416,18 @@ long getNextSequence(byte[] sequence) throws IOException { */ List printSequences() throws IOException { HTableInterface htab = conn.getHBaseTable(SEQUENCES_TABLE); - Get g = new Get(SEQUENCES_KEY); - g.addFamily(CATALOG_CF); - Result result = htab.get(g); - if (result.isEmpty()) return Arrays.asList("No sequences"); - List lines = new ArrayList<>(); - for (Map.Entry entry : result.getFamilyMap(CATALOG_CF).entrySet()) { - lines.add(new String(entry.getKey(), HBaseUtils.ENCODING) + ": " + - new String(entry.getValue(), HBaseUtils.ENCODING)); + Iterator iter = + scan(SEQUENCES_TABLE, CATALOG_CF, CATALOG_COL, null); + List sequences = new ArrayList<>(); + if (!iter.hasNext()) return Arrays.asList("No sequences"); + while (iter.hasNext()) { + Result result = iter.next(); + sequences.add(new StringBuilder(new String(result.getRow(), HBaseUtils.ENCODING)) + .append(": ") + .append(new String(result.getValue(CATALOG_CF, CATALOG_COL), HBaseUtils.ENCODING)) + .toString()); } - return lines; + return sequences; } /**********************************************************************************************