diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseSchemaTool.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseSchemaTool.java index 79c9e08..b131163 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseSchemaTool.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseSchemaTool.java @@ -577,6 +577,6 @@ public void oneMondoTest() throws Exception { outStr = new ByteArrayOutputStream(); out = new PrintStream(outStr); tool.go(false, HBaseReadWrite.SEQUENCES_TABLE, null, "whatever", conf, out, err); - Assert.assertEquals("mk: 1" + lsep, outStr.toString()); + Assert.assertEquals("master_key: 1" + lsep, outStr.toString()); } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseConnection.java metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseConnection.java index 696e588..51dbd69 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseConnection.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseConnection.java @@ -71,9 +71,12 @@ * Create a new table * @param tableName name of the table * @param columnFamilies name of the column families in the table + * @param isRaw if true, create a table directly against HBase, don't use the transaction + * manager. * @throws IOException */ - void createHBaseTable(String tableName, List columnFamilies) throws IOException; + void createHBaseTable(String tableName, List columnFamilies, boolean isRaw) + throws IOException; /** * Fetch an existing HBase table. @@ -93,4 +96,15 @@ */ HTableInterface getHBaseTable(String tableName, boolean force) throws IOException; + /** + * Fetch a raw (non-transactional) connection to an HBase table. You should only do this if + * you're sure you know what you're doing, as this circumvents the transactions. Currently it + * is only used for the sequences table. Using it for other tables could result in mayhem. + * @param tableName name of the table + * @param force if true, force a connection to HBase server + * @return table handle + * @throws IOException + */ + HTableInterface getRawTable(String tableName, boolean force) throws IOException; + } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index 81f1324..b94121d 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -112,6 +112,13 @@ TABLE_TABLE, FILE_METADATA_TABLE }; public final static Map> columnFamilies = new HashMap<> (tableNames.length); + /** + * Maps whether the table should be raw (that is, direct to HBase, not through the transaction + * manager). This is necessary for the sequences table, which needs to use checkAndPut, which + * isn't supported by the transaction managers. + */ + final static Map rawTable = new HashMap<> (tableNames.length); + static { columnFamilies.put(AGGR_STATS_TABLE, Arrays.asList(CATALOG_CF)); columnFamilies.put(DB_TABLE, Arrays.asList(CATALOG_CF)); @@ -126,18 +133,31 @@ columnFamilies.put(TABLE_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); // Stats CF will contain PPD stats. columnFamilies.put(FILE_METADATA_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); + + rawTable.put(AGGR_STATS_TABLE, false); + rawTable.put(DB_TABLE, false); + rawTable.put(FUNC_TABLE, false); + rawTable.put(GLOBAL_PRIVS_TABLE, false); + rawTable.put(PART_TABLE, false); + rawTable.put(USER_TO_ROLE_TABLE, false); + rawTable.put(ROLE_TABLE, false); + rawTable.put(SD_TABLE, false); + rawTable.put(SECURITY_TABLE, false); + rawTable.put(SEQUENCES_TABLE, true); + rawTable.put(TABLE_TABLE, false); + rawTable.put(FILE_METADATA_TABLE, false); } final static byte[] AGGR_STATS_BLOOM_COL = "b".getBytes(HBaseUtils.ENCODING); private final static byte[] AGGR_STATS_STATS_COL = "s".getBytes(HBaseUtils.ENCODING); - final static byte[] MASTER_KEY_SEQUENCE = "mk".getBytes(HBaseUtils.ENCODING); + final static byte[] MASTER_KEY_SEQUENCE = "master_key".getBytes(HBaseUtils.ENCODING); private final static byte[] CATALOG_COL = "c".getBytes(HBaseUtils.ENCODING); private final static byte[] ROLES_COL = "roles".getBytes(HBaseUtils.ENCODING); private final static byte[] REF_COUNT_COL = "ref".getBytes(HBaseUtils.ENCODING); private final static byte[] DELEGATION_TOKEN_COL = "dt".getBytes(HBaseUtils.ENCODING); private final static byte[] MASTER_KEY_COL = "mk".getBytes(HBaseUtils.ENCODING); + private final static byte[] SEQUENCES_COL = "seq".getBytes(HBaseUtils.ENCODING); private final static byte[] GLOBAL_PRIVS_KEY = "gp".getBytes(HBaseUtils.ENCODING); - private final static byte[] SEQUENCES_KEY = "seq".getBytes(HBaseUtils.ENCODING); private final static int TABLES_TO_CACHE = 10; // False positives are very bad here because they cause us to invalidate entries we shouldn't. // Space used and # of hash functions grows in proportion to ln of num bits so a 10x increase @@ -289,7 +309,7 @@ static synchronized void createTablesIfNotExist() throws IOException { for (String name : tableNames) { if (self.get().conn.getHBaseTable(name, true) == null) { List families = columnFamilies.get(name); - self.get().conn.createHBaseTable(name, families); + self.get().conn.createHBaseTable(name, families, rawTable.get(name)); } } tablesCreated = true; @@ -2372,13 +2392,24 @@ void deleteMasterKey(Integer seqNo) throws IOException { *********************************************************************************************/ long getNextSequence(byte[] sequence) throws IOException { - byte[] serialized = read(SEQUENCES_TABLE, SEQUENCES_KEY, CATALOG_CF, sequence); - long val = 0; - if (serialized != null) { - val = Long.valueOf(new String(serialized, HBaseUtils.ENCODING)); - } - byte[] incrSerialized = new Long(val + 1).toString().getBytes(HBaseUtils.ENCODING); - store(SEQUENCES_TABLE, SEQUENCES_KEY, CATALOG_CF, sequence, incrSerialized); + HTableInterface htab = conn.getRawTable(SEQUENCES_TABLE, false); + byte[] serialized; + Put p; + long val; + do { + Get g = new Get(sequence); + g.addColumn(CATALOG_CF, SEQUENCES_COL); + Result res = htab.get(g); + serialized = res.getValue(CATALOG_CF, SEQUENCES_COL); + val = 0; + if (serialized != null) { + val = Long.valueOf(new String(serialized, HBaseUtils.ENCODING)); + } + p = new Put(sequence); + byte[] incrSerialized = new Long(val + 1).toString().getBytes(HBaseUtils.ENCODING); + p.add(CATALOG_CF, SEQUENCES_COL, incrSerialized); + htab.put(p); + } while (!htab.checkAndPut(sequence, CATALOG_CF, SEQUENCES_COL, serialized, p)); return val; } @@ -2388,15 +2419,20 @@ long getNextSequence(byte[] sequence) throws IOException { * @throws IOException */ List printSequences() throws IOException { - HTableInterface htab = conn.getHBaseTable(SEQUENCES_TABLE); - Get g = new Get(SEQUENCES_KEY); - g.addFamily(CATALOG_CF); - Result result = htab.get(g); - if (result.isEmpty()) return Arrays.asList("No sequences"); + HTableInterface htab = conn.getRawTable(SEQUENCES_TABLE, false); + Scan s = new Scan(); + // Don't set a start or end, we want to see it all + s.addColumn(CATALOG_CF, SEQUENCES_COL); + Iterator iter = htab.getScanner(s).iterator(); + if (!iter.hasNext()) return Arrays.asList("No sequences"); List lines = new ArrayList<>(); - for (Map.Entry entry : result.getFamilyMap(CATALOG_CF).entrySet()) { - lines.add(new String(entry.getKey(), HBaseUtils.ENCODING) + ": " + - new String(entry.getValue(), HBaseUtils.ENCODING)); + while (iter.hasNext()) { + Result result = iter.next(); + lines.add( + new StringBuilder(new String(result.getRow(), HBaseUtils.ENCODING)) + .append(": ") + .append(new String(result.getValue(CATALOG_CF, SEQUENCES_COL), HBaseUtils.ENCODING)) + .toString()); } return lines; } @@ -2483,31 +2519,6 @@ private void multiRead(String table, byte[] colFam, byte[] colName, } } - private void multiModify(String table, byte[][] keys, byte[] colFam, - byte[] colName, List values) throws IOException, InterruptedException { - assert values == null || keys.length == values.size(); - // HBase APIs are weird. To supply bytebuffer value, you have to also have bytebuffer - // column name, but not column family. So there. Perhaps we should add these to constants too. - ByteBuffer colNameBuf = ByteBuffer.wrap(colName); - @SuppressWarnings("deprecation") - HTableInterface htab = conn.getHBaseTable(table); - List actions = new ArrayList<>(keys.length); - for (int i = 0; i < keys.length; ++i) { - ByteBuffer value = (values != null) ? values.get(i) : null; - if (value == null) { - actions.add(new Delete(keys[i])); - } else { - Put p = new Put(keys[i]); - p.addColumn(colFam, colNameBuf, HConstants.LATEST_TIMESTAMP, value); - actions.add(p); - } - } - Object[] results = new Object[keys.length]; - htab.batch(actions, results); - // TODO: should we check results array? we don't care about partial results - conn.flush(htab); - } - private Result read(String table, byte[] key, byte[] colFam, byte[][] colNames) throws IOException { HTableInterface htab = conn.getHBaseTable(table); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/hbase/TephraHBaseConnection.java metastore/src/java/org/apache/hadoop/hive/metastore/hbase/TephraHBaseConnection.java index f66200f..0e59e62 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/hbase/TephraHBaseConnection.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/hbase/TephraHBaseConnection.java @@ -70,7 +70,9 @@ public void connect() throws IOException { new ThreadLocalClientProvider(conf, new InMemoryDiscoveryService())); } for (String tableName : HBaseReadWrite.tableNames) { - txnTables.put(tableName, new TransactionAwareHTable(super.getHBaseTable(tableName, true))); + if (!HBaseReadWrite.rawTable.get(tableName)) { + txnTables.put(tableName, new TransactionAwareHTable(super.getHBaseTable(tableName, true))); + } } txn = new TransactionContext(txnClient, txnTables.values()); } @@ -111,9 +113,10 @@ public void flush(HTableInterface htab) throws IOException { } @Override - protected HTableDescriptor buildDescriptor(String tableName, List columnFamilies) - throws IOException { - HTableDescriptor tableDesc = super.buildDescriptor(tableName, columnFamilies); + protected HTableDescriptor buildDescriptor(String tableName, List columnFamilies, + boolean isRaw) throws IOException { + HTableDescriptor tableDesc = super.buildDescriptor(tableName, columnFamilies, isRaw); + if (isRaw) return tableDesc; tableDesc.addCoprocessor(TransactionProcessor.class.getName()); return tableDesc; } @@ -124,4 +127,6 @@ public HTableInterface getHBaseTable(String tableName, boolean force) throws IOE return (TransactionAwareHTable)txnTables.get(tableName); } + // Intentionally don't override getRawTable so that it is served by VanillaHBaseConnection. + } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/hbase/VanillaHBaseConnection.java metastore/src/java/org/apache/hadoop/hive/metastore/hbase/VanillaHBaseConnection.java index e631580..5926362 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/hbase/VanillaHBaseConnection.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/hbase/VanillaHBaseConnection.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hive.metastore.hbase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; @@ -29,7 +27,8 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Result; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; @@ -82,16 +81,16 @@ public void flush(HTableInterface htab) throws IOException { } @Override - public void createHBaseTable(String tableName, List columnFamilies) + public void createHBaseTable(String tableName, List columnFamilies, boolean isRaw) throws IOException { HBaseAdmin admin = new HBaseAdmin(conn); LOG.info("Creating HBase table " + tableName); - admin.createTable(buildDescriptor(tableName, columnFamilies)); + admin.createTable(buildDescriptor(tableName, columnFamilies, isRaw)); admin.close(); } - protected HTableDescriptor buildDescriptor(String tableName, List columnFamilies) - throws IOException { + protected HTableDescriptor buildDescriptor(String tableName, List columnFamilies, + boolean isRaw) throws IOException { HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); for (byte[] cf : columnFamilies) { tableDesc.addFamily(new HColumnDescriptor(cf)); @@ -126,6 +125,11 @@ public HTableInterface getHBaseTable(String tableName, boolean force) throws IOE } @Override + public HTableInterface getRawTable(String tableName, boolean force) throws IOException { + return getHBaseTable(tableName, force); + } + + @Override public void setConf(Configuration conf) { this.conf = conf; }