Index: metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java =================================================================== --- metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java (revision 985768) +++ metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java (working copy) @@ -181,12 +181,15 @@ client.createTable(tbl); + Table tbl_aftercreate; if(isThriftClient) { // the createTable() above does not update the location in the 'tbl' // object when the client is a thrift client and the code below relies // on the location being present in the 'tbl' object - so get the table // from the metastore - tbl = client.getTable(dbName, tblName); + tbl_aftercreate = client.getTable(dbName, tblName); //MARK + }else{ + tbl_aftercreate = tbl; } Partition part = new Partition(); @@ -194,27 +197,27 @@ part.setTableName(tblName); part.setValues(vals); part.setParameters(new HashMap()); - part.setSd(tbl.getSd()); - part.getSd().setSerdeInfo(tbl.getSd().getSerdeInfo()); - part.getSd().setLocation(tbl.getSd().getLocation() + "/part1"); + part.setSd(tbl_aftercreate.getSd()); + part.getSd().setSerdeInfo(tbl_aftercreate.getSd().getSerdeInfo()); + part.getSd().setLocation(tbl_aftercreate.getSd().getLocation() + "/part1"); Partition part2 = new Partition(); part2.setDbName(dbName); part2.setTableName(tblName); part2.setValues(vals2); part2.setParameters(new HashMap()); - part2.setSd(tbl.getSd()); - part2.getSd().setSerdeInfo(tbl.getSd().getSerdeInfo()); - part2.getSd().setLocation(tbl.getSd().getLocation() + "/part2"); + part2.setSd(tbl_aftercreate.getSd()); + part2.getSd().setSerdeInfo(tbl_aftercreate.getSd().getSerdeInfo()); + part2.getSd().setLocation(tbl_aftercreate.getSd().getLocation() + "/part2"); Partition part3 = new Partition(); part3.setDbName(dbName); part3.setTableName(tblName); part3.setValues(vals3); part3.setParameters(new HashMap()); - part3.setSd(tbl.getSd()); - part3.getSd().setSerdeInfo(tbl.getSd().getSerdeInfo()); - part3.getSd().setLocation(tbl.getSd().getLocation() + "/part2"); + part3.setSd(tbl_aftercreate.getSd()); + part3.getSd().setSerdeInfo(tbl_aftercreate.getSd().getSerdeInfo()); + part3.getSd().setLocation(tbl_aftercreate.getSd().getLocation() + "/part2"); // check if the partition exists (it shouldn;t) boolean exceptionThrown = false; @@ -354,7 +357,11 @@ } public void testAlterPartition() throws Throwable { + alterPartitionTester(client,hiveConf,false); + } + public static void alterPartitionTester(HiveMetaStoreClient client, HiveConf hiveConf, + boolean isThriftClient) throws Exception { try { String dbName = "compdb"; String tblName = "comptbl"; @@ -398,14 +405,21 @@ client.createTable(tbl); + Table tbl_aftercreate; + if (isThriftClient){ + tbl_aftercreate = client.getTable(dbName, tblName); + }else{ + tbl_aftercreate = tbl; + } + Partition part = new Partition(); part.setDbName(dbName); part.setTableName(tblName); part.setValues(vals); part.setParameters(new HashMap()); - part.setSd(tbl.getSd()); - part.getSd().setSerdeInfo(tbl.getSd().getSerdeInfo()); - part.getSd().setLocation(tbl.getSd().getLocation() + "/part1"); + part.setSd(tbl_aftercreate.getSd()); + part.getSd().setSerdeInfo(tbl_aftercreate.getSd().getSerdeInfo()); + part.getSd().setLocation(tbl_aftercreate.getSd().getLocation() + "/part1"); client.add_partition(part); @@ -436,6 +450,11 @@ } public void testDatabase() throws Throwable { + databaseTester(client,hiveConf,false); + } + + public static void databaseTester(HiveMetaStoreClient client, HiveConf hiveConf, + boolean isThriftClient) throws Throwable { try { // clear up any existing databases client.dropDatabase("test1"); @@ -565,6 +584,11 @@ } public void testSimpleTable() throws Exception { + simpleTableTester(client,hiveConf,false); + } + + public static void simpleTableTester(HiveMetaStoreClient client, HiveConf hiveConf, + boolean isThriftClient) throws Exception { try { String dbName = "simpdb"; String tblName = "simptbl"; @@ -609,7 +633,16 @@ tbl.setPartitionKeys(new ArrayList()); client.createTable(tbl); + Table tbl_aftercreate; + if (isThriftClient){ + // in the local metastore mode, tbl is updated with the new value by createTable(tbl) + // In the thrift mode, it'd be null. So, we re-read + tbl_aftercreate = client.getTable(dbName, tblName); + }else{ + tbl_aftercreate = tbl; + } + Table tbl2 = client.getTable(dbName, tblName); assertNotNull(tbl2); assertEquals(tbl2.getDbName(), dbName); @@ -617,7 +650,7 @@ assertEquals(tbl2.getSd().getCols().size(), typ1.getFields().size()); assertEquals(tbl2.getSd().isCompressed(), false); assertEquals(tbl2.getSd().getNumBuckets(), 1); - assertEquals(tbl2.getSd().getLocation(), tbl.getSd().getLocation()); + assertEquals(tbl2.getSd().getLocation(), tbl_aftercreate.getSd().getLocation()); // MARK assertNotNull(tbl2.getSd().getSerdeInfo()); sd.getSerdeInfo().setParameters(new HashMap()); sd.getSerdeInfo().getParameters().put( @@ -626,23 +659,23 @@ tbl2.setTableName(tblName2); tbl2.setParameters(new HashMap()); tbl2.getParameters().put("EXTERNAL", "TRUE"); - tbl2.getSd().setLocation(tbl.getSd().getLocation() + "-2"); + tbl2.getSd().setLocation(tbl2.getSd().getLocation() + "-2"); List fieldSchemas = client.getFields(dbName, tblName); assertNotNull(fieldSchemas); - assertEquals(fieldSchemas.size(), tbl.getSd().getCols().size()); - for (FieldSchema fs : tbl.getSd().getCols()) { + assertEquals(fieldSchemas.size(), tbl_aftercreate.getSd().getCols().size()); + for (FieldSchema fs : tbl_aftercreate.getSd().getCols()) { assertTrue(fieldSchemas.contains(fs)); } List fieldSchemasFull = client.getSchema(dbName, tblName); assertNotNull(fieldSchemasFull); - assertEquals(fieldSchemasFull.size(), tbl.getSd().getCols().size() - + tbl.getPartitionKeys().size()); - for (FieldSchema fs : tbl.getSd().getCols()) { + assertEquals(fieldSchemasFull.size(), tbl_aftercreate.getSd().getCols().size() + + tbl_aftercreate.getPartitionKeys().size()); + for (FieldSchema fs : tbl_aftercreate.getSd().getCols()) { assertTrue(fieldSchemasFull.contains(fs)); } - for (FieldSchema fs : tbl.getPartitionKeys()) { + for (FieldSchema fs : tbl_aftercreate.getPartitionKeys()) { assertTrue(fieldSchemasFull.contains(fs)); } @@ -656,8 +689,19 @@ assertEquals(tbl3.getSd().isCompressed(), false); assertEquals(tbl3.getSd().getNumBuckets(), 1); assertEquals(tbl3.getSd().getLocation(), tbl2.getSd().getLocation()); - assertEquals(tbl3.getParameters(), tbl2.getParameters()); - + if (!isThriftClient){ + // params like transient_lastDdlTime are not present in remote case + assertEquals(tbl3.getParameters(), tbl2.getParameters()); + }else{ + // check that "EXTERNAL" at least is honoured among the parameters + String EXTKEY = "EXTERNAL"; + for (String paramKey : tbl2.getParameters().keySet()){ + if (paramKey.equals(EXTKEY)){ + assertTrue(tbl3.getParameters().containsKey(EXTKEY)); + assertEquals(tbl3.getParameters().get(EXTKEY),tbl2.getParameters().get(EXTKEY)); + } + } + } fieldSchemas = client.getFields(dbName, tblName2); assertNotNull(fieldSchemas); assertEquals(fieldSchemas.size(), tbl2.getSd().getCols().size()); @@ -683,10 +727,10 @@ (tbl2.getPartitionKeys() == null) || (tbl2.getPartitionKeys().size() == 0)); - FileSystem fs = FileSystem.get((new Path(tbl.getSd().getLocation())).toUri(), + FileSystem fs = FileSystem.get((new Path(tbl_aftercreate.getSd().getLocation())).toUri(), hiveConf); client.dropTable(dbName, tblName); - assertFalse(fs.exists(new Path(tbl.getSd().getLocation()))); + assertFalse(fs.exists(new Path(tbl_aftercreate.getSd().getLocation()))); client.dropTable(dbName, tblName2); assertTrue(fs.exists(new Path(tbl2.getSd().getLocation()))); @@ -703,6 +747,12 @@ } public void testAlterTable() throws Exception { + alterTableTester(client,hiveConf,false); + } + + public static void alterTableTester(HiveMetaStoreClient client, HiveConf hiveConf, + boolean isThriftClient) throws Exception { + try { String dbName = "alterdb"; String invTblName = "alter-tbl"; @@ -753,6 +803,13 @@ tbl.getSd().setCols(cols); client.createTable(tbl); + Table tbl_aftercreate; + if (isThriftClient){ + tbl_aftercreate = client.getTable(dbName,tblName); + }else{ + tbl_aftercreate = tbl; + } + // now try to invalid alter table Table tbl2 = client.getTable(dbName, tblName); failed = false; @@ -776,14 +833,16 @@ assertEquals("Alter table didn't succeed. Num buckets is different ", tbl2.getSd().getNumBuckets(), tbl3.getSd().getNumBuckets()); // check that data has moved - FileSystem fs = FileSystem.get((new Path(tbl.getSd().getLocation())).toUri(), + FileSystem fs = FileSystem.get((new Path(tbl_aftercreate.getSd().getLocation())).toUri(), hiveConf); - assertFalse("old table location still exists", fs.exists(new Path(tbl + assertFalse("old table location still exists", fs.exists(new Path(tbl_aftercreate .getSd().getLocation()))); assertTrue("data did not move to new location", fs.exists(new Path(tbl3 .getSd().getLocation()))); - assertEquals("alter table didn't move data correct location", tbl3 - .getSd().getLocation(), tbl2.getSd().getLocation()); + if (!isThriftClient){ + assertEquals("alter table didn't move data correct location", tbl3 + .getSd().getLocation(), tbl2.getSd().getLocation()); + } } catch (Exception e) { System.err.println(StringUtils.stringifyException(e)); System.err.println("testSimpleTable() failed."); @@ -792,7 +851,13 @@ } public void testComplexTable() throws Exception { + complexTableTester(client,hiveConf,false); + } + public static void complexTableTester(HiveMetaStoreClient client, HiveConf hiveConf, + boolean isThriftClient) throws Exception { + + String dbName = "compdb"; String tblName = "comptbl"; String typeName = "Person"; Index: metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreRemoteWithClientFileOwner.java =================================================================== --- metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreRemoteWithClientFileOwner.java (revision 0) +++ metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreRemoteWithClientFileOwner.java (revision 0) @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import junit.framework.TestCase; + +import org.apache.hadoop.hive.conf.HiveConf; + + +public class TestHiveMetaStoreRemoteWithClientFileOwner extends TestCase { + private static final String METASTORE_PORT = "29083"; + private static final long THREAD_SLEEP_TIME = 10000; + + private static final boolean clientSideFsOps = true; + private HiveMetaStoreClient client; + private HiveConf hiveConf; + boolean isServerRunning = false; + + private static class RunMS implements Runnable { + + @Override + public void run() { + System.out.println("Running metastore!"); + String [] args = new String [1]; + args[0] = METASTORE_PORT; + System.setProperty(MetaStoreUtils.HADOOP_FS_OPS_OWNER, (clientSideFsOps? "false" : "true")); + +// System.setProperty("fs.default.name", "blargh:///"); + // The intent was to sabotage the server-side ability to write files, + // and see if that fixes the issues, but that is not currently possible + // until we remove all read-dependencies on the thrift-store side as well. + + HiveMetaStore.main(args); + } + + } + + @Override + protected void setUp() throws Exception { + if(isServerRunning) { + return; + } + + super.setUp(); + Thread t = new Thread(new RunMS()); + t.start(); + + // Wait a little bit for the metastore to start. Should probably have + // a better way of detecting if the metastore has started? + Thread.sleep(THREAD_SLEEP_TIME); + + // Set conf to connect to the local metastore. + hiveConf = new HiveConf(this.getClass()); + // hive.metastore.local should be defined in HiveConf + hiveConf.set("hive.metastore.local", "false"); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + METASTORE_PORT); + hiveConf.setIntVar(HiveConf.ConfVars.METATORETHRIFTRETRIES, 3); + hiveConf.setVar(HiveConf.ConfVars.HADOOPFS, "file:///"); + + // client is responsible for FS operations + hiveConf.setBoolVar(HiveConf.ConfVars.HADOOPFSOPSOWNER, clientSideFsOps); + + client = new HiveMetaStoreClient(hiveConf); + // Now you have the client - run necessary tests. + isServerRunning = true; + } + + public void testAll() throws Throwable { + TestHiveMetaStore.databaseTester(client, hiveConf, true); + TestHiveMetaStore.partitionTester(client, hiveConf, true); + TestHiveMetaStore.simpleTableTester(client, hiveConf, true); + TestHiveMetaStore.complexTableTester(client, hiveConf, true); + TestHiveMetaStore.alterTableTester(client, hiveConf, true); + TestHiveMetaStore.alterPartitionTester(client, hiveConf, true); + } + +} Index: metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreRemote.java =================================================================== --- metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreRemote.java (revision 985768) +++ metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreRemote.java (working copy) @@ -21,13 +21,17 @@ import junit.framework.TestCase; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.util.StringUtils; public class TestHiveMetaStoreRemote extends TestCase { private static final String METASTORE_PORT = "29083"; -private HiveMetaStoreClient client; + private static final long THREAD_SLEEP_TIME = 10000; + + private HiveMetaStoreClient client; private HiveConf hiveConf; boolean isServerRunning = false; + private Thread t; private static class RunMS implements Runnable { @@ -36,6 +40,10 @@ System.out.println("Running metastore!"); String [] args = new String [1]; args[0] = METASTORE_PORT; + // System.setProperty(MetaStoreUtils.HADOOP_FS_OPS_OWNER, "true"); + // set backend thrift server to "own" responsibility to make fs operations + // true is default for thrift metastore server, this is not needed - added only for sake of example/comparison + HiveMetaStore.main(args); } @@ -47,12 +55,12 @@ if(isServerRunning) { return; } - Thread t = new Thread(new RunMS()); + t = new Thread(new RunMS()); t.start(); // Wait a little bit for the metastore to start. Should probably have // a better way of detecting if the metastore has started? - Thread.sleep(5000); + Thread.sleep(THREAD_SLEEP_TIME); // Set conf to connect to the local metastore. hiveConf = new HiveConf(this.getClass()); @@ -61,11 +69,29 @@ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + METASTORE_PORT); hiveConf.setIntVar(HiveConf.ConfVars.METATORETHRIFTRETRIES, 3); + // hiveConf.setBoolVar(HiveConf.ConfVars.HADOOPFSOPSOWNER, false); + // false is default for client, this is not needed - added only for sake of example/comparison + client = new HiveMetaStoreClient(hiveConf); // Now you have the client - run necessary tests. isServerRunning = true; } + @Override + protected void tearDown() throws Exception { + try { + super.tearDown(); + client.close(); + t.interrupt(); + Thread.sleep(THREAD_SLEEP_TIME); + isServerRunning = false; + } catch (Throwable e) { + System.err.println("Unable to close metastore"); + System.err.println(StringUtils.stringifyException(e)); + throw new Exception(e); + } + } + /** * tests create table and partition and tries to drop the table without * droppping the partition Index: metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java (revision 985768) +++ metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java (working copy) @@ -40,6 +40,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; /** * This class represents a warehouse where data of Hive tables is stored @@ -128,6 +130,17 @@ return new Path(getDefaultDatabasePath(dbName), tableName.toLowerCase()); } + public boolean mkdirIfNotExists(Path f) throws MetaException{ + if (!isDir(f)) { + if (!mkdirs(f)) { + throw new MetaException(f + + " is not a directory or unable to create one"); + } + return true; + } + return false; + } + public boolean mkdirs(Path f) throws MetaException { FileSystem fs = null; try { @@ -328,7 +341,7 @@ } return FileUtils.makePartName(colNames, vals); } - + public static List getPartValuesFromPartName(String partName) throws MetaException { LinkedHashMap partSpec = Warehouse.makeSpecFromName(partName); @@ -337,4 +350,36 @@ return values; } + public Path getTablePath(final Table tbl) throws MetaException { + Path tblPath = null; + if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) { + if (tbl.getSd().getLocation() == null + || tbl.getSd().getLocation().isEmpty()) { + tblPath = getDefaultTablePath( + tbl.getDbName(), tbl.getTableName()); + } else { + if (!MetaStoreUtils.isExternalTable(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) { + LOG.warn("Location: " + tbl.getSd().getLocation() + + " specified for non-external table:" + tbl.getTableName()); + } + tblPath = getDnsPath(new Path(tbl.getSd().getLocation())); + } + } + return tblPath; + } + + public Path getPartitionPath(final Table tbl, Partition part) throws MetaException { + Path partLocation = null; + String partLocationStr = part.getSd().getLocation(); + if (partLocationStr == null || partLocationStr.isEmpty()) { + // set default location if not specified + partLocation = new Path(tbl.getSd().getLocation(), Warehouse + .makePartName(tbl.getPartitionKeys(), part.getValues())); + + } else { + partLocation = getDnsPath(new Path(partLocationStr)); + } + return partLocation; + } + } Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (revision 985768) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -25,8 +26,11 @@ import java.util.List; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException; @@ -63,6 +67,8 @@ // for thrift connects private int retries = 5; + private final boolean fileSystemOwnerMode; + private Warehouse wh; static final private Log LOG = LogFactory.getLog("hive.metastore"); @@ -79,7 +85,16 @@ conf = new HiveConf(HiveMetaStoreClient.class); } + // figure out if we are responsible for fs operations or not + // assume default false for current behaviour prior to HIVE-1476 wherein + // filesystem ownership belongs on the MetaStore side. + fileSystemOwnerMode = conf.getBoolean(MetaStoreUtils.HADOOP_FS_OPS_OWNER,false); + if (fileSystemOwnerMode){ + wh = new Warehouse(conf); + } + boolean localMetaStore = conf.getBoolean("hive.metastore.local", false); + if (localMetaStore) { // instantiate the metastore server handler directly instead of connecting // through the network @@ -140,7 +155,79 @@ */ public void alter_table(String dbname, String tbl_name, Table new_tbl) throws InvalidOperationException, MetaException, TException { + boolean moveData = false; + Path srcPath = null; + Path destPath = null; + FileSystem srcFs = null; + if (fileSystemOwnerMode){ + // determine if this operation will result in a fs rename. + // this is similar(but not identical) to code in HiveAlterHandler.alterTable, + try { + Table old_tbl = client.get_table(dbname, tbl_name); + boolean rename = false; + if (!new_tbl.getTableName().equalsIgnoreCase(tbl_name) + || !new_tbl.getDbName().equalsIgnoreCase(dbname)) { + rename = true; + } + + if (rename + && (old_tbl.getSd().getLocation().compareTo(new_tbl.getSd().getLocation()) == 0 + || StringUtils.isEmpty(new_tbl.getSd().getLocation())) + && !MetaStoreUtils.isExternalTable(old_tbl) + ) { + destPath = wh.getDefaultTablePath(new_tbl.getDbName(), + new_tbl.getTableName()); + srcPath = new Path(old_tbl.getSd().getLocation()); + + moveData = true; + + srcFs = wh.getFs(srcPath); + FileSystem destFs = wh.getFs(destPath); + + // check that src and dest are on the same file system + if (srcFs != destFs) { + throw new InvalidOperationException("table new location " + destPath + + " is on a different file system than the old location " + + srcPath + ". This operation is not supported"); + } + try { + srcFs.exists(srcPath); // check that src exists and also checks + // permissions necessary + if (destFs.exists(destPath)) { + throw new InvalidOperationException("New location for this table " + + new_tbl.getDbName() + "." + new_tbl.getTableName() + + " already exists : " + destPath); + } + } catch (IOException e) { + Warehouse.closeFs(srcFs); + Warehouse.closeFs(destFs); + throw new InvalidOperationException("Unable to access new location " + + destPath + " for table " + new_tbl.getDbName() + "." + + new_tbl.getTableName()); + } + + + } + } catch (NoSuchObjectException e) { + throw new InvalidOperationException("table " + dbname + "." + + tbl_name + " doesn't exist"); + } + } client.alter_table(dbname, tbl_name, new_tbl); + if (moveData){ + // change the file name in hdfs + // check that src exists otherwise there is no need to copy the data + try { + if (srcFs.exists(srcPath)) { + // rename the src to destination + srcFs.rename(srcPath, destPath); + } + } catch (IOException e) { + throw new InvalidOperationException("Unable to access old location " + + srcPath + " for table " + dbname + "." + tbl_name); + } + + } } private void open() throws MetaException { @@ -229,7 +316,11 @@ public Partition add_partition(Partition new_part) throws InvalidObjectException, AlreadyExistsException, MetaException, TException { - return deepCopy(client.add_partition(new_part)); + Partition ptn = client.add_partition(new_part); + if (fileSystemOwnerMode){ + wh.mkdirIfNotExists(new Path(ptn.getSd().getLocation())); + } + return deepCopy(ptn); } /** @@ -247,14 +338,21 @@ public Partition appendPartition(String db_name, String table_name, List part_vals) throws InvalidObjectException, AlreadyExistsException, MetaException, TException { - return deepCopy(client.append_partition(db_name, table_name, part_vals)); + Partition ptn = client.append_partition(db_name, table_name, part_vals); + if (fileSystemOwnerMode){ + wh.mkdirIfNotExists(new Path(ptn.getSd().getLocation())); + } + return deepCopy(ptn); } public Partition appendPartition(String dbName, String tableName, String partName) throws InvalidObjectException, AlreadyExistsException, MetaException, TException { - return deepCopy( - client.append_partition_by_name(dbName, tableName, partName)); + Partition ptn = client.append_partition_by_name(dbName, tableName, partName); + if (fileSystemOwnerMode){ + wh.mkdirIfNotExists(new Path(ptn.getSd().getLocation())); + } + return deepCopy(ptn); } /** * @param name @@ -268,7 +366,28 @@ */ public boolean createDatabase(String name, String location_uri) throws AlreadyExistsException, MetaException, TException { - return client.create_database(name, location_uri); + boolean success = client.create_database(name, location_uri); + if (success && fileSystemOwnerMode){ + try { + success = wh.mkdirs(wh.getDefaultDatabasePath(name)); + } + catch (MetaException me){ + success = false; + try { + client.drop_database(name); // attempt to rollback + } catch (MetaException rollbackMe){ + throw new MetaException( + "Failed when trying to rollback metadata db create after client-side mkdir failed - " + + "fs and metadata potentially out of sync : " + rollbackMe.getMessage() + ); + // Note : With this approach, we always have a likelyhood that the metadata create succeeds, + // then the fs mkdir fails, and we're unable to rollback because the metadata drop fails. + // So, if client.drop_database throws an exception, we throw + // our hands up in the air and try to explain in the exception. + } + } + } + return success; } /** @@ -291,6 +410,31 @@ hook.commitCreateTable(tbl); } success = true; + if (fileSystemOwnerMode){ + Path tblPath = wh.getTablePath(tbl); + tbl.getSd().setLocation(tblPath.toString()); + if (tblPath != null){ // not sure why it being null would be a valid success case, but mirroring how the MetaStore code used to be. + try { + wh.mkdirIfNotExists(tblPath); + } catch (MetaException me){ + success = false; + try { + client.drop_table(tbl.getDbName(),tbl.getTableName(),false); // attempt to rollback the metadata create + } catch (MetaException rollbackMe){ + throw new MetaException( + "Failed when trying to rollback metadata table create after mkdir failed - " + + "fs and metadata potentially out of sync : " + rollbackMe.getMessage() + ); + // Note : With this approach, we always have a likelihood that the metadata create succeeds, + // then the fs mkdir fails, and we're unable to rollback because the metadata drop fails. + // So, if client.drop_table throws an exception, we throw + // our hands up in the air and try to explain in the exception. + } + // ok, rolling back done, now we probably should rethrow the exception from having failed in the first place. + throw me; + } + } + } } finally { if (!success && (hook != null)) { hook.rollbackCreateTable(tbl); @@ -320,7 +464,11 @@ * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#drop_database(java.lang.String) */ public boolean dropDatabase(String name) throws MetaException, TException { - return client.drop_database(name); + boolean retval = client.drop_database(name); + if (fileSystemOwnerMode){ + wh.deleteDir(wh.getDefaultDatabasePath(name), true); + } + return retval; } /** @@ -342,7 +490,17 @@ public boolean dropPartition(String dbName, String tableName, String partName, boolean deleteData) throws NoSuchObjectException, MetaException, TException { - return client.drop_partition_by_name(dbName, tableName, partName, deleteData); + Partition part = null; + Table tbl = null; + if (fileSystemOwnerMode){ + part = client.get_partition_by_name(dbName, tableName, partName); + tbl = client.get_table(dbName, tableName); + } + boolean success = client.drop_partition_by_name(dbName, tableName, partName, deleteData); + if(success && fileSystemOwnerMode && !MetaStoreUtils.isExternalTable(tbl)){ + wh.deleteDir(MetaStoreUtils.getPartitionPath(part), true); + } + return success; } /** * @param db_name @@ -360,7 +518,17 @@ public boolean dropPartition(String db_name, String tbl_name, List part_vals, boolean deleteData) throws NoSuchObjectException, MetaException, TException { - return client.drop_partition(db_name, tbl_name, part_vals, deleteData); + Partition part = null; + Table tbl = null; + if (fileSystemOwnerMode){ + part = client.get_partition(db_name, tbl_name, part_vals); + tbl = client.get_table(db_name, tbl_name); + } + boolean success = client.drop_partition(db_name, tbl_name, part_vals, deleteData); + if(success && fileSystemOwnerMode && !MetaStoreUtils.isExternalTable(tbl)){ + wh.deleteDir(MetaStoreUtils.getPartitionPath(part), true); + } + return success; } /** @@ -410,6 +578,14 @@ boolean success = false; try { client.drop_table(dbname, name, deleteData); + Path tblPath = null; + if ( + fileSystemOwnerMode + && (!MetaStoreUtils.isExternalTable(tbl)) + && ((tblPath = new Path(tbl.getSd().getLocation())) != null) + ){ + wh.deleteDir(tblPath, true); + } if (hook != null) { hook.commitDropTable(tbl, deleteData); } @@ -604,7 +780,7 @@ UnknownDBException { return deepCopyFieldSchemas(client.get_fields(db, tableName)); } - + /** * create an index * @param index the index object @@ -613,12 +789,12 @@ * @throws MetaException * @throws NoSuchObjectException * @throws TException - * @throws AlreadyExistsException + * @throws AlreadyExistsException */ public void createIndex(Index index, Table indexTable) throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException, TException { client.add_index(index, indexTable); } - + /** * @param dbName * @param tblName @@ -652,7 +828,7 @@ /** * list all the index names of the give base table. - * + * * @param db_name * @param tbl_name * @param max @@ -664,7 +840,7 @@ throws NoSuchObjectException, MetaException, TException { return client.get_indexes(dbName, tblName, max); } - + /** * @param db * @param tableName @@ -693,13 +869,12 @@ public Partition appendPartitionByName(String dbName, String tableName, String partName) throws InvalidObjectException, AlreadyExistsException, MetaException, TException { - return deepCopy( - client.append_partition_by_name(dbName, tableName, partName)); + return appendPartition(dbName,tableName,partName); } public boolean dropPartitionByName(String dbName, String tableName, String partName, boolean deleteData) throws NoSuchObjectException, MetaException, TException { - return client.drop_partition_by_name(dbName, tableName, partName, deleteData); + return dropPartition(dbName, tableName, partName, deleteData); } private HiveMetaHook getHook(Table tbl) throws MetaException { Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision 985768) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (working copy) @@ -81,6 +81,9 @@ // right now they come from jpox.properties private Warehouse wh; // hdfs warehouse + + private boolean fileSystemOwnerMode; + private final ThreadLocal threadLocalMS = new ThreadLocal() { @Override @@ -152,6 +155,12 @@ HiveAlterHandler.class.getName()); alterHandler = (AlterHandler) ReflectionUtils.newInstance(getClass( alterHandlerName, AlterHandler.class), hiveConf); + + // figure out if we are responsible for fs operations or not + // assume default true for current behaviour prior to HIVE-1476 wherein + // filesystem ownership belongs on the MetaStore side. + fileSystemOwnerMode = hiveConf.getBoolean(MetaStoreUtils.HADOOP_FS_OPS_OWNER,true); + wh = new Warehouse(hiveConf); retryInterval = HiveConf.getIntVar(hiveConf, @@ -426,9 +435,12 @@ try { ms.openTransaction(); Database db = new Database(name, location_uri); - if (ms.createDatabase(db) - && wh.mkdirs(wh.getDefaultDatabasePath(name))) { - success = ms.commitTransaction(); + if (ms.createDatabase(db)){ + if (fileSystemOwnerMode){ + success = wh.mkdirs(wh.getDefaultDatabasePath(name)) && ms.commitTransaction(); + }else{ + success = ms.commitTransaction(); + } } } finally { if (!success) { @@ -499,7 +511,9 @@ if (!success) { ms.rollbackTransaction(); } else { - wh.deleteDir(wh.getDefaultDatabasePath(name), true); + if (fileSystemOwnerMode){ + wh.deleteDir(wh.getDefaultDatabasePath(name), true); + } // it is not a terrible thing even if the data is not deleted } } @@ -637,7 +651,6 @@ private void create_table_core(final RawStore ms, final Table tbl) throws AlreadyExistsException, MetaException, InvalidObjectException { - if (!MetaStoreUtils.validateName(tbl.getTableName()) || !MetaStoreUtils.validateColNames(tbl.getSd().getCols()) || (tbl.getPartitionKeys() != null && !MetaStoreUtils @@ -650,35 +663,20 @@ boolean success = false, madeDir = false; try { ms.openTransaction(); - + // get_table checks whether database exists, it should be moved here if (is_table_exists(tbl.getDbName(), tbl.getTableName())) { throw new AlreadyExistsException("Table " + tbl.getTableName() + " already exists"); } - - if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) { - if (tbl.getSd().getLocation() == null - || tbl.getSd().getLocation().isEmpty()) { - tblPath = wh.getDefaultTablePath( - tbl.getDbName(), tbl.getTableName()); - } else { - if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) { - LOG.warn("Location: " + tbl.getSd().getLocation() - + " specified for non-external table:" + tbl.getTableName()); - } - tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation())); - } - tbl.getSd().setLocation(tblPath.toString()); - } - if (tblPath != null) { - if (!wh.isDir(tblPath)) { - if (!wh.mkdirs(tblPath)) { - throw new MetaException(tblPath - + " is not a directory or unable to create one"); - } - madeDir = true; + tblPath = wh.getTablePath(tbl); + tbl.getSd().setLocation(tblPath.toString()); + if (fileSystemOwnerMode){ + + if (tblPath != null) { + madeDir = wh.mkdirIfNotExists(tblPath); + } } @@ -689,7 +687,6 @@ ms.createTable(tbl); success = ms.commitTransaction(); - } finally { if (!success) { ms.rollbackTransaction(); @@ -700,6 +697,7 @@ } } + public void create_table(final Table tbl) throws AlreadyExistsException, MetaException, InvalidObjectException { incrementCounter("create_table"); @@ -754,7 +752,7 @@ if (tbl.getSd() == null) { throw new MetaException("Table metadata is corrupted"); } - + isIndexTable = isIndexTable(tbl); if (isIndexTable) { throw new RuntimeException( @@ -778,7 +776,7 @@ if (tbl.getSd().getLocation() != null) { tblPath = new Path(tbl.getSd().getLocation()); } - + if (!ms.dropTable(dbname, name)) { throw new MetaException("Unable to drop table"); } @@ -787,7 +785,7 @@ } finally { if (!success) { ms.rollbackTransaction(); - } else if (deleteData && (tblPath != null) && !isExternal) { + } else if (deleteData && (tblPath != null) && !isExternal && fileSystemOwnerMode) { wh.deleteDir(tblPath, true); // ok even if the data is not deleted } @@ -828,7 +826,7 @@ private boolean isExternal(Table table) { return MetaStoreUtils.isExternalTable(table); } - + private boolean isIndexTable (Table table) { return MetaStoreUtils.isIndexTable(table); } @@ -906,12 +904,8 @@ throw new AlreadyExistsException("Partition already exists:" + part); } - if (!wh.isDir(partLocation)) { - if (!wh.mkdirs(partLocation)) { - throw new MetaException(partLocation - + " is not a directory or unable to create one"); - } - madeDir = true; + if (fileSystemOwnerMode){ + madeDir = wh.mkdirIfNotExists(partLocation); } // set create time @@ -1039,27 +1033,15 @@ "Unable to add partition because table or database do not exist"); } - String partLocationStr = part.getSd().getLocation(); - if (partLocationStr == null || partLocationStr.isEmpty()) { - // set default location if not specified - partLocation = new Path(tbl.getSd().getLocation(), Warehouse - .makePartName(tbl.getPartitionKeys(), part.getValues())); + partLocation = wh.getPartitionPath(tbl, part); - } else { - partLocation = wh.getDnsPath(new Path(partLocationStr)); - } - part.getSd().setLocation(partLocation.toString()); // Check to see if the directory already exists before calling mkdirs() // because if the file system is read-only, mkdirs will throw an // exception even if the directory already exists. - if (!wh.isDir(partLocation)) { - if (!wh.mkdirs(partLocation)) { - throw new MetaException(partLocation - + " is not a directory or unable to create one"); - } - madeDir = true; + if (fileSystemOwnerMode){ + madeDir = wh.mkdirIfNotExists(partLocation); } // set create time @@ -1115,8 +1097,6 @@ Path partPath = null; Table tbl = null; Partition part = null; - boolean isArchived = false; - Path archiveParentDir = null; try { ms.openTransaction(); @@ -1127,10 +1107,6 @@ + part_vals); } - isArchived = MetaStoreUtils.isArchived(part); - if (isArchived) { - archiveParentDir = MetaStoreUtils.getOriginalLocation(part); - } if (part.getSd() == null || part.getSd().getLocation() == null) { throw new MetaException("Partition metadata is corrupted"); } @@ -1138,28 +1114,24 @@ throw new MetaException("Unable to drop partition"); } success = ms.commitTransaction(); - partPath = new Path(part.getSd().getLocation()); + partPath = MetaStoreUtils.getPartitionPath(part); tbl = get_table(db_name, tbl_name); } finally { if (!success) { ms.rollbackTransaction(); - } else if (deleteData && ((partPath != null) || (archiveParentDir != null))) { + } else if (deleteData && (partPath != null)) { if (tbl != null && !isExternal(tbl)) { - // Archived partitions have har:/to_har_file as their location. - // The original directory was saved in params - if (isArchived) { - assert(archiveParentDir != null); - wh.deleteDir(archiveParentDir, true); - } else { - assert(partPath != null); + assert(partPath != null); + if (fileSystemOwnerMode){ wh.deleteDir(partPath, true); + // ok even if the data is not deleted } - // ok even if the data is not deleted } } } return true; } + public boolean drop_partition(final String db_name, final String tbl_name, final List part_vals, final boolean deleteData) throws NoSuchObjectException, MetaException, TException { @@ -1724,12 +1696,12 @@ } return ret; } - + private Index add_index_core(final RawStore ms, final Index index, final Table indexTable) throws InvalidObjectException, AlreadyExistsException, MetaException { - + boolean success = false, indexTableCreated = false; - + try { ms.openTransaction(); Index old_index = null; @@ -1746,13 +1718,13 @@ throw new InvalidObjectException( "Unable to add index because database or the orginal table do not exist"); } - + // set create time long time = System.currentTimeMillis() / 1000; Table indexTbl = indexTable; if (indexTbl != null) { try { - indexTbl = ms.getTable(index.getDbName(), index.getIndexTableName()); + indexTbl = ms.getTable(index.getDbName(), index.getIndexTableName()); } catch (Exception e) { } if (indexTbl != null) { @@ -1812,7 +1784,7 @@ return ret.booleanValue(); } - + private boolean drop_index_by_name_core(final RawStore ms, final String dbName, final String tblName, final String indexName, final boolean deleteData) throws NoSuchObjectException, @@ -1822,14 +1794,14 @@ Path tblPath = null; try { ms.openTransaction(); - + //drop the underlying index table Index index = get_index_by_name(dbName, tblName, indexName); if (index == null) { throw new NoSuchObjectException(indexName + " doesn't exist"); } ms.dropIndex(dbName, tblName, indexName); - + String idxTblName = index.getIndexTableName(); if (idxTblName != null) { Table tbl = null; @@ -1837,7 +1809,7 @@ if (tbl.getSd() == null) { throw new MetaException("Table metadata is corrupted"); } - + if (tbl.getSd().getLocation() != null) { tblPath = new Path(tbl.getSd().getLocation()); } @@ -1889,7 +1861,7 @@ } return ret; } - + private Index get_index_by_name_core(final RawStore ms, final String db_name, final String tbl_name, final String index_name) throws MetaException, NoSuchObjectException, TException { Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java (revision 985768) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java (working copy) @@ -40,6 +40,7 @@ public class HiveAlterHandler implements AlterHandler { private Configuration hiveConf; + private Boolean fileSystemOwnerMode = null; private static final Log LOG = LogFactory.getLog(HiveAlterHandler.class .getName()); @@ -57,6 +58,12 @@ if (newt == null) { throw new InvalidOperationException("New table is invalid: " + newt); } + if (fileSystemOwnerMode == null){ + // figure out if we are responsible for fs operations or not + // assume default true for current behaviour prior to HIVE-1476 wherein + // filesystem ownership belongs on the MetaStore side. + fileSystemOwnerMode = hiveConf.getBoolean(MetaStoreUtils.HADOOP_FS_OPS_OWNER,true); + } if (!MetaStoreUtils.validateName(newt.getTableName()) || !MetaStoreUtils.validateColNames(newt.getSd().getCols())) { @@ -118,33 +125,35 @@ newt.getTableName()).toString(); newt.getSd().setLocation(newTblLoc); oldTblLoc = oldt.getSd().getLocation(); - moveData = true; // check that destination does not exist otherwise we will be // overwriting data srcPath = new Path(oldTblLoc); - srcFs = wh.getFs(srcPath); destPath = new Path(newTblLoc); - destFs = wh.getFs(destPath); - // check that src and dest are on the same file system - if (srcFs != destFs) { - throw new InvalidOperationException("table new location " + destPath - + " is on a different file system than the old location " - + srcPath + ". This operation is not supported"); - } - try { - srcFs.exists(srcPath); // check that src exists and also checks - // permissions necessary - if (destFs.exists(destPath)) { - throw new InvalidOperationException("New location for this table " - + newt.getDbName() + "." + newt.getTableName() - + " already exists : " + destPath); + if (fileSystemOwnerMode){ + moveData = true; + srcFs = wh.getFs(srcPath); + destFs = wh.getFs(destPath); + // check that src and dest are on the same file system + if (srcFs != destFs) { + throw new InvalidOperationException("table new location " + destPath + + " is on a different file system than the old location " + + srcPath + ". This operation is not supported"); } - } catch (IOException e) { - Warehouse.closeFs(srcFs); - Warehouse.closeFs(destFs); - throw new InvalidOperationException("Unable to access new location " - + destPath + " for table " + newt.getDbName() + "." - + newt.getTableName()); + try { + srcFs.exists(srcPath); // check that src exists and also checks + // permissions necessary + if (destFs.exists(destPath)) { + throw new InvalidOperationException("New location for this table " + + newt.getDbName() + "." + newt.getTableName() + + " already exists : " + destPath); + } + } catch (IOException e) { + Warehouse.closeFs(srcFs); + Warehouse.closeFs(destFs); + throw new InvalidOperationException("Unable to access new location " + + destPath + " for table " + newt.getDbName() + "." + + newt.getTableName()); + } } // also the location field in partition List parts = msdb.getPartitions(dbname, name, 0); Index: metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (revision 985768) +++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.Constants; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; @@ -60,7 +61,9 @@ protected static final Log LOG = LogFactory.getLog("hive.log"); public static final String DEFAULT_DATABASE_NAME = "default"; - + public static final String HADOOP_FS_OPS_OWNER = "hadoop.fs.operations.owner"; + + /** * printStackTrace * @@ -883,7 +886,7 @@ } return true; } - + public static String getIndexTableName(String dbName, String baseTblName, String indexName) { return dbName + "__" + baseTblName + "_" + indexName + "__"; } @@ -894,5 +897,16 @@ } return TableType.INDEX_TABLE.toString().equals(table.getTableType()); } - + + public static Path getPartitionPath(Partition part) { + if (MetaStoreUtils.isArchived(part)) { + // Archived partitions have har:/to_har_file as their location. + // The original directory was saved in params + return getOriginalLocation(part); + }else{ + return new Path(part.getSd().getLocation()); + } + } + + } Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 985768) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -269,6 +269,9 @@ // For har files HIVEARCHIVEENABLED("hive.archive.enabled", false), HIVEHARPARENTDIRSETTABLE("hive.archive.har.parentdir.settable", false), + + // For determining if current process scope is responsible for filesystem operations + HADOOPFSOPSOWNER("hadoop.fs.operations.owner", false), ; public final String varname;