commit 90399056f3762b2c8ca05277fb7dc0c35d8a24c3 Author: Daniel Dai Date: Wed Apr 27 16:40:19 2016 -0700 HIVE-13631: Support index in HBase Metastore diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java index d14010d..36b2fad 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.FunctionType; +import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -63,6 +64,7 @@ private static final String[] tableNames = new String[] {"allnonparttable", "allparttable"}; private static final String[] partVals = new String[] {"na", "emea", "latam", "apac"}; private static final String[] funcNames = new String[] {"allfunc1", "allfunc2"}; + private static final String[] indexNames = new String[] {"allindex1", "allindex2"}; private static final List masterKeySeqs = new ArrayList(); @Rule @@ -145,7 +147,11 @@ public void importAll() throws Exception { } Assert.assertEquals(4, store.getPartitions(dbNames[i], tableNames[1], -1).size()); - Assert.assertEquals(2, store.getAllTables(dbNames[i]).size()); + // Including two index table + Assert.assertEquals(4, store.getAllTables(dbNames[i]).size()); + + Assert.assertEquals(2, store.getIndexes(dbNames[i], tableNames[0], -1).size()); + Assert.assertEquals(0, store.getIndexes(dbNames[i], tableNames[1], -1).size()); Assert.assertEquals(2, store.getFunctions(dbNames[i], "*").size()); for (int j = 0; j < funcNames.length; j++) { @@ -217,7 +223,11 @@ public void importOneDb() throws Exception { } Assert.assertEquals(4, store.getPartitions(dbNames[0], tableNames[1], -1).size()); - Assert.assertEquals(2, store.getAllTables(dbNames[0]).size()); + // Including two index table + Assert.assertEquals(4, store.getAllTables(dbNames[0]).size()); + + Assert.assertEquals(2, store.getIndexes(dbNames[0], tableNames[0], -1).size()); + Assert.assertEquals(0, store.getIndexes(dbNames[0], tableNames[1], -1).size()); Assert.assertEquals(2, store.getFunctions(dbNames[0], "*").size()); for (int j = 0; j < funcNames.length; j++) { @@ -322,6 +332,9 @@ public void importOneTableNonPartitioned() throws Exception { Assert.assertEquals(1, store.getAllTables(db.getName()).size()); Assert.assertNull(store.getTable(db.getName(), tableNames[1])); + List indexes = store.getIndexes(db.getName(), tableNames[0], -1); + Assert.assertEquals(2, indexes.size()); + Assert.assertEquals(0, store.getFunctions(dbNames[0], "*").size()); Assert.assertEquals(baseNumDbs + 1, store.getAllDatabases().size()); @@ -378,6 +391,9 @@ public void importOneTablePartitioned() throws Exception { Assert.assertNull(store.getTable(db.getName(), tableNames[0])); + List indexes = store.getIndexes(db.getName(), tableNames[1], -1); + Assert.assertEquals(0, indexes.size()); + Assert.assertEquals(0, store.getFunctions(dbNames[0], "*").size()); Assert.assertEquals(baseNumDbs + 1, store.getAllDatabases().size()); @@ -510,6 +526,15 @@ private void setupObjectStore(RawStore rdbms, String[] roles, String[] dbNames, PrincipalType.USER, (int) System.currentTimeMillis() / 1000, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR, "uri")))); } + + for (String indexName : indexNames) { + LOG.debug("Creating new index " + dbNames[i] + "." + tableNames[0] + "." + indexName); + String indexTableName = tableNames[0] + "__" + indexName + "__"; + rdbms.createTable(new Table(indexTableName, dbNames[i], "me", now, now, 0, sd, partCols, + emptyParameters, null, null, null)); + rdbms.addIndex(new Index(indexName, null, dbNames[i], tableNames[0], + now, now, indexTableName, sd, emptyParameters, false)); + } } for (int i = 0; i < tokenIds.length; i++) rdbms.addToken(tokenIds[i], tokens[i]); for (int i = 0; i < masterKeys.length; i++) { diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java index 434bd9e..b005b4e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore.hbase; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -35,6 +36,7 @@ import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -118,6 +120,7 @@ protected RawStore initialValue() { private List dbs; private BlockingQueue partitionedTables; private BlockingQueue tableNameQueue; + private BlockingQueue indexNameQueue; private BlockingQueue partQueue; private boolean writingToQueue, readersFinished; private boolean doKerberos, doAll; @@ -239,6 +242,7 @@ private int init(String... args) throws ParseException { // We don't want to bound the size of the table queue because we keep it all in memory partitionedTables = new LinkedBlockingQueue<>(); tableNameQueue = new LinkedBlockingQueue<>(); + indexNameQueue = new LinkedBlockingQueue<>(); // Bound the size of this queue so we don't get too much in memory. partQueue = new ArrayBlockingQueue<>(parallel * 2); @@ -263,6 +267,7 @@ void run() throws MetaException, InstantiationException, IllegalAccessException, if (doAll || dbsToImport != null || tablesToImport != null) { copyTables(); copyPartitions(); + copyIndexes(); } if (doAll || dbsToImport != null || functionsToImport != null) { copyFunctions(); @@ -371,6 +376,66 @@ public void run() { } } + private void copyIndexes() throws MetaException, InvalidObjectException, InterruptedException { + screen("Copying indexes"); + + // Start the parallel threads that will copy the indexes + Thread[] copiers = new Thread[parallel]; + writingToQueue = true; + for (int i = 0; i < parallel; i++) { + copiers[i] = new IndexCopier(); + copiers[i].start(); + } + + // Put indexes from the databases we copied into the queue + for (Database db : dbs) { + screen("Coyping indexes in database " + db.getName()); + for (String tableName : rdbmsStore.get().getAllTables(db.getName())) { + for (Index index : rdbmsStore.get().getIndexes(db.getName(), tableName, -1)) { + indexNameQueue.put(new String[]{db.getName(), tableName, index.getIndexName()}); + } + } + } + + // Now put any specifically requested tables into the queue + if (tablesToImport != null) { + for (String compoundTableName : tablesToImport) { + String[] tn = compoundTableName.split("\\."); + if (tn.length != 2) { + error(compoundTableName + " not in proper form. Must be in form dbname.tablename. " + + "Ignoring this table and continuing."); + } else { + for (Index index : rdbmsStore.get().getIndexes(tn[0], tn[1], -1)) { + indexNameQueue.put(new String[]{tn[0], tn[1], index.getIndexName()}); + } + } + } + } + + writingToQueue = false; + + // Wait until we've finished adding all the tables + for (Thread copier : copiers) copier.join(); + } + + private class IndexCopier extends Thread { + @Override + public void run() { + while (writingToQueue || indexNameQueue.size() > 0) { + try { + String[] name = indexNameQueue.poll(1, TimeUnit.SECONDS); + if (name != null) { + Index index = rdbmsStore.get().getIndex(name[0], name[1], name[2]); + screen("Copying index " + name[0] + "." + name[1] + "." + name[2]); + hbaseStore.get().addIndex(index); + } + } catch (InterruptedException | MetaException | InvalidObjectException e) { + throw new RuntimeException(e); + } + } + } + } + /* Partition copying is a little complex. As we went through and copied the tables we put each * partitioned table into a queue. We will now go through that queue and add partitions for the * tables. We do the finding of partitions and writing of them separately and in parallel. diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index 2860875..7901bde 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; @@ -98,6 +99,7 @@ final static String SECURITY_TABLE = "HBMS_SECURITY"; final static String SEQUENCES_TABLE = "HBMS_SEQUENCES"; final static String TABLE_TABLE = "HBMS_TBLS"; + final static String INDEX_TABLE = "HBMS_INDEX"; final static String USER_TO_ROLE_TABLE = "HBMS_USER_TO_ROLE"; final static String FILE_METADATA_TABLE = "HBMS_FILE_METADATA"; final static byte[] CATALOG_CF = "c".getBytes(HBaseUtils.ENCODING); @@ -109,7 +111,7 @@ public final static String[] tableNames = { AGGR_STATS_TABLE, DB_TABLE, FUNC_TABLE, GLOBAL_PRIVS_TABLE, PART_TABLE, USER_TO_ROLE_TABLE, ROLE_TABLE, SD_TABLE, SECURITY_TABLE, SEQUENCES_TABLE, - TABLE_TABLE, FILE_METADATA_TABLE }; + TABLE_TABLE, INDEX_TABLE, FILE_METADATA_TABLE }; public final static Map> columnFamilies = new HashMap<> (tableNames.length); static { @@ -124,6 +126,7 @@ columnFamilies.put(SECURITY_TABLE, Arrays.asList(CATALOG_CF)); columnFamilies.put(SEQUENCES_TABLE, Arrays.asList(CATALOG_CF)); columnFamilies.put(TABLE_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); + columnFamilies.put(INDEX_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); // Stats CF will contain PPD stats. columnFamilies.put(FILE_METADATA_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); } @@ -1745,6 +1748,119 @@ private Table getTable(String dbName, String tableName, boolean populateCache) } /********************************************************************************************** + * Index related methods + *********************************************************************************************/ + + /** + * Put an index object. This should only be called when the index is new (create index) as it + * will blindly add/increment the storage descriptor. If you are altering an existing index + * call {@link #replaceIndex} instead. + * @param index index object + * @throws IOException + */ + void putIndex(Index index) throws IOException { + byte[] hash = putStorageDescriptor(index.getSd()); + byte[][] serialized = HBaseUtils.serializeIndex(index, hash); + store(INDEX_TABLE, serialized[0], CATALOG_CF, CATALOG_COL, serialized[1]); + } + + /** + * Fetch an index object + * @param dbName database the table is in + * @param origTableName original table name + * @param indexName index name + * @return Index object, or null if no such table + * @throws IOException + */ + Index getIndex(String dbName, String origTableName, String indexName) throws IOException { + byte[] key = HBaseUtils.buildKey(dbName, origTableName, indexName); + byte[] serialized = read(INDEX_TABLE, key, CATALOG_CF, CATALOG_COL); + if (serialized == null) return null; + HBaseUtils.StorageDescriptorParts sdParts = + HBaseUtils.deserializeIndex(dbName, origTableName, indexName, serialized); + StorageDescriptor sd = getStorageDescriptor(sdParts.sdHash); + HBaseUtils.assembleStorageDescriptor(sd, sdParts); + return sdParts.containingIndex; + } + + /** + * Delete a table + * @param dbName name of database table is in + * @param origTableName table the index is built on + * @param indexName index name + * @throws IOException + */ + void deleteIndex(String dbName, String origTableName, String indexName) throws IOException { + deleteIndex(dbName, origTableName, indexName, true); + } + + void deleteIndex(String dbName, String origTableName, String indexName, boolean decrementRefCnt) + throws IOException { + // Find the index so I can get the storage descriptor and drop it + if (decrementRefCnt) { + Index index = getIndex(dbName, origTableName, indexName); + decrementStorageDescriptorRefCount(index.getSd()); + } + byte[] key = HBaseUtils.buildKey(dbName, origTableName, indexName); + delete(INDEX_TABLE, key, null, null); + } + + /** + * Get a list of tables. + * @param dbName Database these tables are in + * @param origTableName original table name + * @param maxResults max indexes to fetch. If negative all indexes will be returned. + * @return list of indexes of the table + * @throws IOException + */ + List scanIndexes(String dbName, String origTableName, int maxResults) throws IOException { + // There's no way to know whether all the tables we are looking for are + // in the cache, so we would need to scan one way or another. Thus there's no value in hitting + // the cache for this function. + byte[] keyPrefix = null; + if (dbName != null) { + keyPrefix = HBaseUtils.buildKeyWithTrailingSeparator(dbName, origTableName); + } + Iterator iter = scan(INDEX_TABLE, keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), + CATALOG_CF, CATALOG_COL, null); + List indexes = new ArrayList<>(); + int numToFetch = maxResults < 0 ? Integer.MAX_VALUE : maxResults; + for (int i = 0; i < numToFetch && iter.hasNext(); i++) { + Result result = iter.next(); + HBaseUtils.StorageDescriptorParts sdParts = HBaseUtils.deserializeIndex(result.getRow(), + result.getValue(CATALOG_CF, CATALOG_COL)); + StorageDescriptor sd = getStorageDescriptor(sdParts.sdHash); + HBaseUtils.assembleStorageDescriptor(sd, sdParts); + indexes.add(sdParts.containingIndex); + } + return indexes; + } + + /** + * Replace an existing index. This will also compare the storage descriptors and see if the + * reference count needs to be adjusted + * @param oldIndex old version of the index + * @param newIndex new version of the index + */ + void replaceIndex(Index oldIndex, Index newIndex) throws IOException { + byte[] hash; + byte[] oldHash = HBaseUtils.hashStorageDescriptor(oldIndex.getSd(), md); + byte[] newHash = HBaseUtils.hashStorageDescriptor(newIndex.getSd(), md); + if (Arrays.equals(oldHash, newHash)) { + hash = oldHash; + } else { + decrementStorageDescriptorRefCount(oldIndex.getSd()); + hash = putStorageDescriptor(newIndex.getSd()); + } + byte[][] serialized = HBaseUtils.serializeIndex(newIndex, hash); + store(INDEX_TABLE, serialized[0], CATALOG_CF, CATALOG_COL, serialized[1]); + if (!(oldIndex.getDbName().equals(newIndex.getDbName()) && + oldIndex.getOrigTableName().equals(newIndex.getOrigTableName()) && + oldIndex.getIndexName().equals(newIndex.getIndexName()))) { + deleteIndex(oldIndex.getDbName(), oldIndex.getOrigTableName(), oldIndex.getIndexName(), false); + } + } + /********************************************************************************************** * StorageDescriptor related methods *********************************************************************************************/ diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 82ecf88..b260b3a 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -702,38 +702,128 @@ public void alterPartitions(String db_name, String tbl_name, List> @Override public boolean addIndex(Index index) throws InvalidObjectException, MetaException { - throw new UnsupportedOperationException(); + boolean commit = false; + openTransaction(); + try { + index.setDbName(HiveStringUtils.normalizeIdentifier(index.getDbName())); + index.setOrigTableName(HiveStringUtils.normalizeIdentifier(index.getOrigTableName())); + index.setIndexName(HiveStringUtils.normalizeIdentifier(index.getIndexName())); + index.setIndexTableName(HiveStringUtils.normalizeIdentifier(index.getIndexTableName())); + getHBase().putIndex(index); + commit = true; + } catch (IOException e) { + LOG.error("Unable to create index ", e); + throw new MetaException("Unable to read from or write to hbase " + e.getMessage()); + } finally { + commitOrRoleBack(commit); + } + return commit; } @Override public Index getIndex(String dbName, String origTableName, String indexName) throws MetaException { - throw new UnsupportedOperationException(); + boolean commit = false; + openTransaction(); + try { + Index index = getHBase().getIndex(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(origTableName), + HiveStringUtils.normalizeIdentifier(indexName)); + if (index == null) { + LOG.debug("Unable to find index " + indexNameForErrorMsg(dbName, origTableName, indexName)); + } + commit = true; + return index; + } catch (IOException e) { + LOG.error("Unable to get index", e); + throw new MetaException("Error reading index " + e.getMessage()); + } finally { + commitOrRoleBack(commit); + } } @Override public boolean dropIndex(String dbName, String origTableName, String indexName) throws MetaException { - throw new UnsupportedOperationException(); + boolean commit = false; + openTransaction(); + try { + getHBase().deleteIndex(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(origTableName), + HiveStringUtils.normalizeIdentifier(indexName)); + commit = true; + return true; + } catch (IOException e) { + LOG.error("Unable to delete index" + e); + throw new MetaException("Unable to drop index " + + indexNameForErrorMsg(dbName, origTableName, indexName)); + } finally { + commitOrRoleBack(commit); + } } @Override public List getIndexes(String dbName, String origTableName, int max) throws MetaException { - // TODO - Index not currently supported. But I need to return an empty list or else drop - // table cores. - return new ArrayList(); + boolean commit = false; + openTransaction(); + try { + List indexes = getHBase().scanIndexes(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(origTableName), max); + commit = true; + return indexes; + } catch (IOException e) { + LOG.error("Unable to get indexes", e); + throw new MetaException("Error scanning indexxes"); + } finally { + commitOrRoleBack(commit); + } } @Override public List listIndexNames(String dbName, String origTableName, short max) throws MetaException { - throw new UnsupportedOperationException(); + boolean commit = false; + openTransaction(); + try { + List indexes = getHBase().scanIndexes(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(origTableName), max); + if (indexes == null) return null; + List names = new ArrayList(indexes.size()); + for (Index index : indexes) { + names.add(index.getIndexName()); + } + commit = true; + return names; + } catch (IOException e) { + LOG.error("Unable to get indexes", e); + throw new MetaException("Error scanning indexes"); + } finally { + commitOrRoleBack(commit); + } } @Override public void alterIndex(String dbname, String baseTblName, String name, Index newIndex) throws InvalidObjectException, MetaException { - throw new UnsupportedOperationException(); + boolean commit = false; + openTransaction(); + try { + Index newIndexCopy = newIndex.deepCopy(); + newIndexCopy.setDbName(HiveStringUtils.normalizeIdentifier(newIndexCopy.getDbName())); + newIndexCopy.setOrigTableName( + HiveStringUtils.normalizeIdentifier(newIndexCopy.getOrigTableName())); + newIndexCopy.setIndexName(HiveStringUtils.normalizeIdentifier(newIndexCopy.getIndexName())); + getHBase().replaceIndex(getHBase().getIndex(HiveStringUtils.normalizeIdentifier(dbname), + HiveStringUtils.normalizeIdentifier(baseTblName), + HiveStringUtils.normalizeIdentifier(name)), newIndexCopy); + commit = true; + } catch (IOException e) { + LOG.error("Unable to alter index " + indexNameForErrorMsg(dbname, baseTblName, name), e); + throw new MetaException("Unable to alter index " + + indexNameForErrorMsg(dbname, baseTblName, name)); + } finally { + commitOrRoleBack(commit); + } } @Override @@ -2432,6 +2522,12 @@ private String partNameForErrorMsg(String dbName, String tableName, List return tableNameForErrorMsg(dbName, tableName) + "." + StringUtils.join(partVals, ':'); } + // This is for building error messages only. It does not look up anything in the metastore as + // they may just throw another error. + private String indexNameForErrorMsg(String dbName, String origTableName, String indexName) { + return tableNameForErrorMsg(dbName, origTableName) + "." + indexName; + } + private String buildExternalPartName(Table table, Partition part) { return buildExternalPartName(table, part.getValues()); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java index e0b449b..947fc60 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.FunctionType; +import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; @@ -830,6 +831,7 @@ static StorageDescriptor deserializeStorageDescriptor(byte[] serialized) Map parameters; Partition containingPartition; Table containingTable; + Index containingIndex; } static void assembleStorageDescriptor(StorageDescriptor sd, StorageDescriptorParts parts) { @@ -841,7 +843,10 @@ static void assembleStorageDescriptor(StorageDescriptor sd, StorageDescriptorPar parts.containingPartition.setSd(ssd); } else if (parts.containingTable != null) { parts.containingTable.setSd(ssd); - } else { + } else if (parts.containingIndex != null) { + parts.containingIndex.setSd(ssd); + } + else { throw new RuntimeException("Need either a partition or a table"); } } @@ -1114,6 +1119,94 @@ static StorageDescriptorParts deserializeTable(String dbName, String tableName, return sdParts; } + /** + * Serialize an index + * @param index index object + * @param sdHash hash that is being used as a key for the enclosed storage descriptor + * @return First element is the key, second is the serialized index + */ + static byte[][] serializeIndex(Index index, byte[] sdHash) { + byte[][] result = new byte[2][]; + result[0] = buildKey(HiveStringUtils.normalizeIdentifier(index.getDbName()), + HiveStringUtils.normalizeIdentifier(index.getOrigTableName()), + HiveStringUtils.normalizeIdentifier(index.getIndexName())); + HbaseMetastoreProto.Index.Builder builder = HbaseMetastoreProto.Index.newBuilder(); + builder.setDbName(index.getDbName()); + builder.setOrigTableName(index.getOrigTableName()); + if (index.getSd().getLocation() != null) builder.setLocation(index.getSd().getLocation()); + if (index.getSd().getParameters() != null) { + builder.setSdParameters(buildParameters(index.getSd().getParameters())); + } + if (index.getIndexHandlerClass() != null) { + builder.setIndexHandlerClass(index.getIndexHandlerClass()); + } + if (index.getIndexTableName() != null) { + builder.setIndexTableName(index.getIndexTableName()); + } + builder + .setCreateTime(index.getCreateTime()) + .setLastAccessTime(index.getLastAccessTime()) + .setDeferredRebuild(index.isDeferredRebuild()); + if (index.getParameters() != null) { + builder.setParameters(buildParameters(index.getParameters())); + } + if (sdHash != null) { + builder.setSdHash(ByteString.copyFrom(sdHash)); + } + result[1] = builder.build().toByteArray(); + return result; + } + + /** + * Deserialize an index. This version should be used when the index key is not already + * known (eg a scan). + * @param key the key fetched from HBase + * @param serialized the value fetched from HBase + * @return A struct that contains the index plus parts of the storage descriptor + */ + static StorageDescriptorParts deserializeIndex(byte[] key, byte[] serialized) + throws InvalidProtocolBufferException { + String[] keys = deserializeKey(key); + return deserializeIndex(keys[0], keys[1], keys[2], serialized); + } + + /** + * Deserialize an index. This version should be used when the table key is + * known (eg a get). + * @param dbName database name + * @param origTableName original table name + * @param indexName index name + * @param serialized the value fetched from HBase + * @return A struct that contains the index plus parts of the storage descriptor + */ + static StorageDescriptorParts deserializeIndex(String dbName, String origTableName, + String indexName, byte[] serialized) + throws InvalidProtocolBufferException { + HbaseMetastoreProto.Index proto = HbaseMetastoreProto.Index.parseFrom(serialized); + Index index = new Index(); + StorageDescriptorParts sdParts = new StorageDescriptorParts(); + sdParts.containingIndex = index; + index.setDbName(dbName); + index.setIndexName(indexName); + index.setOrigTableName(origTableName); + if (proto.hasLocation()) sdParts.location = proto.getLocation(); + if (proto.hasSdParameters()) sdParts.parameters = buildParameters(proto.getSdParameters()); + if (proto.hasIndexHandlerClass()) { + index.setIndexHandlerClass(proto.getIndexHandlerClass()); + } + if (proto.hasIndexTableName()) { + index.setIndexTableName(proto.getIndexTableName()); + } + index.setCreateTime(proto.getCreateTime()); + index.setLastAccessTime(proto.getLastAccessTime()); + index.setDeferredRebuild(proto.getDeferredRebuild()); + index.setParameters(buildParameters(proto.getParameters())); + if (proto.hasSdHash()) { + sdParts.sdHash = proto.getSdHash().toByteArray(); + } + return sdParts; + } + static byte[] serializeBloomFilter(String dbName, String tableName, BloomFilter bloom) { long[] bitSet = bloom.getBitSet(); List bits = new ArrayList<>(bitSet.length); diff --git a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto index 466fdf9..6fbe36c 100644 --- a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto +++ b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto @@ -257,6 +257,20 @@ message Table { optional bool is_temporary = 14; } +message Index { + optional string indexHandlerClass = 1; // reserved + required string dbName = 2; + required string origTableName = 3; + optional string location = 4; + optional Parameters sd_parameters = 5; // storage descriptor parameters + optional int32 createTime = 6; + optional int32 lastAccessTime = 7; + optional string indexTableName = 8; + optional bytes sd_hash = 9; + optional Parameters parameters = 10; + optional bool deferredRebuild = 11; +} + message PartitionKeyComparator { required string names = 1; required string types = 2; diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java index 2e1f5f4..c144246 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -256,6 +257,12 @@ public static void dropAllStoreObjects(RawStore store) throws MetaException, Inv String db = dbs.get(i); List tbls = store.getAllTables(db); for (String tbl : tbls) { + List indexes = store.getIndexes(db, tbl, 100); + for (Index index : indexes) { + store.dropIndex(db, tbl, index.getIndexName()); + } + } + for (String tbl : tbls) { Deadline.startTimer("getPartition"); List parts = store.getPartitions(db, tbl, 100); for (Partition part : parts) { diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java index e4723f6..4894ed3 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.FunctionType; +import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Order; @@ -757,6 +758,132 @@ public void dropPartition() throws Exception { } @Test + public void createIndex() throws Exception { + String tableName = "mytable"; + int startTime = (int)(System.currentTimeMillis() / 1000); + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "int", "")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + Map params = new HashMap(); + params.put("key", "value"); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17, + serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params); + Table table = new Table(tableName, "default", "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); + store.createTable(table); + + String indexName = "myindex"; + String indexTableName = tableName + "__" + indexName + "__"; + Index index = new Index(indexName, null, "default", tableName, startTime, startTime, + indexTableName, sd, emptyParameters, false); + store.addIndex(index); + + Index ind = store.getIndex("default", tableName, indexName); + Assert.assertEquals(1, ind.getSd().getColsSize()); + Assert.assertEquals("col1", ind.getSd().getCols().get(0).getName()); + Assert.assertEquals("int", ind.getSd().getCols().get(0).getType()); + Assert.assertEquals("", ind.getSd().getCols().get(0).getComment()); + Assert.assertEquals("serde", ind.getSd().getSerdeInfo().getName()); + Assert.assertEquals("seriallib", ind.getSd().getSerdeInfo().getSerializationLib()); + Assert.assertEquals("file:/tmp", ind.getSd().getLocation()); + Assert.assertEquals("input", ind.getSd().getInputFormat()); + Assert.assertEquals("output", ind.getSd().getOutputFormat()); + Assert.assertFalse(ind.getSd().isCompressed()); + Assert.assertEquals(17, ind.getSd().getNumBuckets()); + Assert.assertEquals(1, ind.getSd().getBucketColsSize()); + Assert.assertEquals("bucketcol", ind.getSd().getBucketCols().get(0)); + Assert.assertEquals(1, ind.getSd().getSortColsSize()); + Assert.assertEquals("sortcol", ind.getSd().getSortCols().get(0).getCol()); + Assert.assertEquals(1, ind.getSd().getSortCols().get(0).getOrder()); + Assert.assertEquals(1, ind.getSd().getParametersSize()); + Assert.assertEquals("value", ind.getSd().getParameters().get("key")); + Assert.assertEquals(indexName, ind.getIndexName()); + Assert.assertNull(ind.getIndexHandlerClass()); + Assert.assertEquals("default", ind.getDbName()); + Assert.assertEquals(tableName, ind.getOrigTableName()); + Assert.assertEquals(0, ind.getParametersSize()); + Assert.assertEquals(startTime, ind.getCreateTime()); + Assert.assertEquals(startTime, ind.getLastAccessTime()); + Assert.assertEquals(false, ind.isDeferredRebuild()); + } + + @Test + public void alterIndex() throws Exception { + String tableName = "mytable"; + int startTime = (int)(System.currentTimeMillis() / 1000); + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "int", "")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + Map params = new HashMap(); + params.put("key", "value"); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17, + serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params); + Table table = new Table(tableName, "default", "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); + store.createTable(table); + + String indexName = "myindex"; + Index index = new Index(indexName, null, "default", tableName, startTime, startTime, + tableName + "__" + indexName + "__", sd, emptyParameters, false); + store.addIndex(index); + + startTime += 10; + index.setLastAccessTime(startTime); + store.alterIndex("default", tableName, indexName, index); + + Index ind = store.getIndex("default", tableName, indexName); + Assert.assertEquals(1, ind.getSd().getColsSize()); + Assert.assertEquals("col1", ind.getSd().getCols().get(0).getName()); + Assert.assertEquals("int", ind.getSd().getCols().get(0).getType()); + Assert.assertEquals("", ind.getSd().getCols().get(0).getComment()); + Assert.assertEquals("serde", ind.getSd().getSerdeInfo().getName()); + Assert.assertEquals("seriallib", ind.getSd().getSerdeInfo().getSerializationLib()); + Assert.assertEquals("file:/tmp", ind.getSd().getLocation()); + Assert.assertEquals("input", ind.getSd().getInputFormat()); + Assert.assertEquals("output", ind.getSd().getOutputFormat()); + Assert.assertFalse(ind.getSd().isCompressed()); + Assert.assertEquals(17, ind.getSd().getNumBuckets()); + Assert.assertEquals(1, ind.getSd().getBucketColsSize()); + Assert.assertEquals("bucketcol", ind.getSd().getBucketCols().get(0)); + Assert.assertEquals(1, ind.getSd().getSortColsSize()); + Assert.assertEquals("sortcol", ind.getSd().getSortCols().get(0).getCol()); + Assert.assertEquals(1, ind.getSd().getSortCols().get(0).getOrder()); + Assert.assertEquals(1, ind.getSd().getParametersSize()); + Assert.assertEquals("value", ind.getSd().getParameters().get("key")); + Assert.assertEquals(indexName, ind.getIndexName()); + Assert.assertNull(ind.getIndexHandlerClass()); + Assert.assertEquals("default", ind.getDbName()); + Assert.assertEquals(tableName, ind.getOrigTableName()); + Assert.assertEquals(0, ind.getParametersSize()); + Assert.assertEquals(startTime, ind.getLastAccessTime()); + Assert.assertEquals(false, ind.isDeferredRebuild()); + } + + @Test + public void dropIndex() throws Exception { + String tableName = "mytable"; + int startTime = (int)(System.currentTimeMillis() / 1000); + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "int", "")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + Map params = new HashMap(); + params.put("key", "value"); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17, + serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params); + Table table = new Table(tableName, "default", "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); + store.createTable(table); + + String indexName = "myindex"; + Index index = new Index(indexName, null, "default", tableName, startTime, startTime, + tableName + "__" + indexName + "__", sd, emptyParameters, false); + store.addIndex(index); + + store.dropIndex("default", tableName, indexName); + + } + + @Test public void createRole() throws Exception { int now = (int)System.currentTimeMillis()/1000; String roleName = "myrole"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 4c9acce..56048a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1242,7 +1242,7 @@ public Table getTable(final String dbName, final String tableName, if (!TableType.VIRTUAL_VIEW.toString().equals(tTable.getTableType())) { // Fix the non-printable chars Map parameters = tTable.getSd().getParameters(); - String sf = parameters.get(SERIALIZATION_FORMAT); + String sf = parameters!=null?parameters.get(SERIALIZATION_FORMAT) : null; if (sf != null) { char[] b = sf.toCharArray(); if ((b.length == 1) && (b[0] < 10)) { // ^A, ^B, ^C, ^D, \t