diff --git a/data/files/identity_udf.jar b/data/files/identity_udf.jar deleted file mode 100644 index 8170995..0000000 Binary files a/data/files/identity_udf.jar and /dev/null differ diff --git a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 176761f..ddc2690 100644 --- a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -24,8 +24,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -57,10 +55,10 @@ private static final String driverName = "org.apache.hive.jdbc.HiveDriver"; private static final FsPermission FULL_PERM = new FsPermission((short)00777); private static final FsPermission WRITE_ALL_PERM = new FsPermission((short)00733); + private static final String tmpDir = System.getProperty("test.tmp.dir"); private HiveServer2 hiveServer2 = null; private final File baseDir; private final Path baseDfsDir; - private static final AtomicLong hs2Counter = new AtomicLong(); private MiniMrShim mr; private MiniDFSShim dfs; private MiniLlapCluster llapCluster = null; @@ -133,7 +131,6 @@ public Builder withHTTPTransport(){ return this; } - public MiniHS2 build() throws Exception { if (miniClusterType == MiniClusterType.MR && useMiniKdc) { throw new IOException("Can't create secure miniMr ... yet"); @@ -185,9 +182,13 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM boolean usePortsFromConf, String authType, boolean isHA) throws Exception { // Always use localhost for hostname as some tests like SSL CN validation ones // are tied to localhost being present in the certificate name - super(hiveConf, "localhost", - (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT) : MetaStoreUtils.findFreePort()), - (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT) : MetaStoreUtils.findFreePort())); + super( + hiveConf, + "localhost", + (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT) : MetaStoreUtils + .findFreePort()), + (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT) : MetaStoreUtils + .findFreePort())); hiveConf.setLongVar(ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS, 3l); hiveConf.setTimeVar(ConfVars.HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS, 10, TimeUnit.SECONDS); @@ -195,7 +196,7 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM this.useMiniKdc = useMiniKdc; this.serverPrincipal = serverPrincipal; this.isMetastoreRemote = isMetastoreRemote; - baseDir = Files.createTempDir(); + baseDir = new File(tmpDir + "/local_base"); localFS = FileSystem.getLocal(hiveConf); FileSystem fs; @@ -226,19 +227,20 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM } // store the config in system properties mr.setupConfiguration(getHiveConf()); - baseDfsDir = new Path(new Path(fs.getUri()), "/base"); + baseDfsDir = new Path(new Path(fs.getUri()), "/base"); } else { // This is DFS only mode, just initialize the dfs root directory. fs = FileSystem.getLocal(hiveConf); - baseDfsDir = new Path("file://"+ baseDir.toURI().getPath()); + baseDfsDir = new Path("file://" + baseDir.toURI().getPath()); } if (useMiniKdc) { hiveConf.setVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL, serverPrincipal); hiveConf.setVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB, serverKeytab); hiveConf.setVar(ConfVars.HIVE_SERVER2_AUTHENTICATION, authType); } - String metaStoreURL = "jdbc:derby:" + baseDir.getAbsolutePath() + File.separator + "test_metastore-" + - hs2Counter.incrementAndGet() + ";create=true"; + String metaStoreURL = + "jdbc:derby:;databaseName=" + baseDir.getAbsolutePath() + File.separator + + "test_metastore;create=true"; fs.mkdirs(baseDfsDir); Path wareHouseDir = new Path(baseDfsDir, "warehouse"); @@ -323,9 +325,13 @@ public void stop() { } catch (IOException e) { // Ignore errors cleaning up miniMR } + } + + public void cleanup() { FileUtils.deleteQuietly(baseDir); } + public CLIServiceClient getServiceClient() { verifyStarted(); return getServiceClientInternal(); @@ -506,7 +512,7 @@ private void waitForStartup() throws Exception { break; } while (true); } - + public Service.STATE getState() { return hiveServer2.getServiceState(); } diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index e8e57ef..8fee2ea 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -28,10 +28,12 @@ import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.sql.Connection; +import java.sql.DatabaseMetaData; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -65,11 +67,8 @@ import org.datanucleus.NucleusContext; import org.datanucleus.api.jdo.JDOPersistenceManagerFactory; import org.datanucleus.AbstractNucleusContext; -import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; public class TestJdbcWithMiniHS2 { @@ -77,148 +76,153 @@ private static String dataFileDir; private static Path kvDataFilePath; private static final String tmpDir = System.getProperty("test.tmp.dir"); + private static final String testDbName = "testjdbcminihs2"; + private static final String defaultDbName = "default"; + private static final String tableName = "testjdbcminihs2tbl"; + private static final String tableComment = "Simple table"; + private static Connection conDefault = null; + private static Connection conTestDb = null; + private static String testUdfClassName = + "org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd"; - private Connection hs2Conn = null; - @BeforeClass - public static void beforeTest() throws Exception { - Class.forName(MiniHS2.getJdbcDriverName()); + public TestJdbcWithMiniHS2() throws Exception { HiveConf conf = new HiveConf(); - conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - miniHS2 = new MiniHS2(conf); dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); kvDataFilePath = new Path(dataFileDir, "kv1.txt"); - Map confOverlay = new HashMap(); - miniHS2.start(confOverlay); + startMiniHS2(conf); + // Open default connections which will be used throughout the tests + openDefaultConnections(); + Statement stmt = conDefault.createStatement(); + stmt.execute("drop database if exists " + testDbName + " cascade"); + stmt.execute("create database " + testDbName); + stmt.close(); + // tables in test db + createTestTables(conTestDb, testDbName); + } + + private void createTestTables(Connection conn, String dbName) throws SQLException { + Statement stmt = conn.createStatement(); + Path dataFilePath = new Path(dataFileDir, "kv1.txt"); + // We've already dropped testDbName in constructor & we also drop it in tearDownAfterClass + String prefix = dbName + "."; + String tableName = prefix + TestJdbcWithMiniHS2.tableName; + + // create a table + stmt.execute("create table " + tableName + + " (int_col int comment 'the int column', value string) comment '" + tableComment + "'"); + // load data + stmt.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tableName); + + stmt.close(); } - private Connection getConnection() throws Exception { + private static Connection getConnection() throws Exception { return getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); } - private Connection getConnection(String jdbcURL, String user, String pwd) throws SQLException { + private static Connection getConnection(String dbName) throws Exception { + return getConnection(miniHS2.getJdbcURL(dbName), System.getProperty("user.name"), "bar"); + } + + private static Connection getConnection(String jdbcURL, String user, String pwd) + throws SQLException { Connection conn = DriverManager.getConnection(jdbcURL, user, pwd); - conn.createStatement().execute("set hive.support.concurrency = false"); + assertNotNull(conn); return conn; } - @After - public void tearDown() throws Exception { - if (hs2Conn != null) { - hs2Conn.close(); + @AfterClass + public static void tearDownAfterClass() throws Exception { + // drop test db and its tables and views + Statement stmt = conDefault.createStatement(); + stmt.execute("set hive.support.concurrency = false"); + stmt.execute("drop database if exists " + testDbName + " cascade"); + stmt.close(); + if (conTestDb != null) { + conTestDb.close(); } + if (conDefault != null) { + conDefault.close(); + } + stopMiniHS2(); + cleanupMiniHS2(); } - @AfterClass - public static void afterTest() throws Exception { - if (miniHS2.isStarted()) { - miniHS2.stop(); + private void restoreMiniHS2AndConnections() throws Exception { + if (conTestDb != null) { + conTestDb.close(); } + if (conDefault != null) { + conDefault.close(); + } + stopMiniHS2(); + HiveConf conf = new HiveConf(); + startMiniHS2(conf); + openDefaultConnections(); } - @Test - public void testConnection() throws Exception { - String tableName = "testTab1"; - hs2Conn = getConnection(); - Statement stmt = hs2Conn.createStatement(); + private void startMiniHS2(HiveConf conf) throws Exception { + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED, false); + miniHS2 = new MiniHS2(conf); + Map confOverlay = new HashMap(); + miniHS2.start(confOverlay); + } + + private static void stopMiniHS2() { + if ((miniHS2 != null) && (miniHS2.isStarted())) { + miniHS2.stop(); + } + } - // create table - stmt.execute("DROP TABLE IF EXISTS " + tableName); - stmt.execute("CREATE TABLE " + tableName - + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'"); + private static void cleanupMiniHS2() { + if (miniHS2 != null) { + miniHS2.cleanup(); + } + } - // load data - stmt.execute("load data local inpath '" - + kvDataFilePath.toString() + "' into table " + tableName); + private static void openDefaultConnections() throws Exception { + conDefault = getConnection(); + conTestDb = getConnection(testDbName); + } - ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName); + @Test + public void testConnection() throws Exception { + Statement stmt = conTestDb.createStatement(); + ResultSet res = stmt.executeQuery("select * from " + tableName + " limit 5"); assertTrue(res.next()); - assertEquals("val_238", res.getString(2)); res.close(); stmt.close(); } @Test public void testParallelCompilation() throws Exception { - final String tableName = "testParallelCompilation"; - hs2Conn = getConnection(); - Statement stmt = hs2Conn.createStatement(); - - // create table - stmt.execute("DROP TABLE IF EXISTS " + tableName); - stmt.execute("CREATE TABLE " + tableName - + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'"); - - // load data - stmt.execute("load data local inpath '" - + kvDataFilePath.toString() + "' into table " + tableName); - - ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName); - assertTrue(res.next()); - res.close(); - - stmt.execute("SET hive.driver.parallel.compilation=true"); - stmt.execute("SET hive.server2.async.exec.async.compile=true"); - + Statement stmt = conTestDb.createStatement(); + stmt.execute("set hive.driver.parallel.compilation=true"); + stmt.execute("set hive.server2.async.exec.async.compile=true"); stmt.close(); - - startConcurrencyTest(hs2Conn, tableName, 10); - Connection conn2 = getConnection(); - startConcurrencyTest(conn2, tableName, 10); - conn2.close(); + startConcurrencyTest(conTestDb, tableName, 10); + Connection conn = getConnection(testDbName); + startConcurrencyTest(conn, tableName, 10); + conn.close(); } @Test public void testParallelCompilation2() throws Exception { - final String tableName = "testParallelCompilation2"; - hs2Conn = getConnection(); - Statement stmt = hs2Conn.createStatement(); - - // create table - stmt.execute("DROP TABLE IF EXISTS " + tableName); - stmt.execute("CREATE TABLE " + tableName - + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'"); - - // load data - stmt.execute("load data local inpath '" - + kvDataFilePath.toString() + "' into table " + tableName); - - ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName); - assertTrue(res.next()); - res.close(); - - stmt.execute("SET hive.driver.parallel.compilation=false"); - stmt.execute("SET hive.server2.async.exec.async.compile=true"); - + Statement stmt = conTestDb.createStatement(); + stmt.execute("set hive.driver.parallel.compilation=false"); + stmt.execute("set hive.server2.async.exec.async.compile=true"); stmt.close(); - - startConcurrencyTest(hs2Conn, tableName, 10); - Connection conn2 = getConnection(); - startConcurrencyTest(conn2, tableName, 10); - conn2.close(); + startConcurrencyTest(conTestDb, tableName, 10); + Connection conn = getConnection(testDbName); + startConcurrencyTest(conn, tableName, 10); + conn.close(); } @Test public void testConcurrentStatements() throws Exception { - String tableName = "testConcurrentStatements"; - hs2Conn = getConnection(); - Statement stmt = hs2Conn.createStatement(); - - // create table - stmt.execute("DROP TABLE IF EXISTS " + tableName); - stmt.execute("CREATE TABLE " + tableName - + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'"); - - // load data - stmt.execute("load data local inpath '" - + kvDataFilePath.toString() + "' into table " + tableName); - - ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName); - assertTrue(res.next()); - res.close(); - stmt.close(); - - startConcurrencyTest(hs2Conn, tableName, 300); + startConcurrencyTest(conTestDb, tableName, 50); } private static void startConcurrencyTest(Connection conn, String tableName, int numTasks) { @@ -227,10 +231,11 @@ private static void startConcurrencyTest(Connection conn, String tableName, int int TASK_COUNT = numTasks; SynchronousQueue executorQueue = new SynchronousQueue(); - ExecutorService workers = new ThreadPoolExecutor(1, POOL_SIZE, 20, TimeUnit.SECONDS, executorQueue); + ExecutorService workers = + new ThreadPoolExecutor(1, POOL_SIZE, 20, TimeUnit.SECONDS, executorQueue); List> list = new ArrayList>(); int i = 0; - while(i < TASK_COUNT) { + while (i < TASK_COUNT) { try { Future future = workers.submit(new JDBCTask(conn, i, tableName)); list.add(future); @@ -316,7 +321,7 @@ public Boolean call() throws SQLException { } /** This test is to connect to any database without using the command "Use <>" - * 1)connect to default database. + * 1) connect to default database. * 2) Create a new DB test_default. * 3) Connect to test_default database. * 4) Connect and create table under test_default_test. @@ -331,22 +336,22 @@ public void testURIDatabaseName() throws Exception{ String jdbcUri = miniHS2.getJdbcURL().substring(0, miniHS2.getJdbcURL().indexOf("default")); - hs2Conn= getConnection(jdbcUri+"default", System.getProperty("user.name"),"bar"); + Connection conn= getConnection(jdbcUri+"default", System.getProperty("user.name"),"bar"); String dbName="test_connection_non_default_db"; String tableInNonDefaultSchema="table_in_non_default_schema"; - Statement stmt = hs2Conn.createStatement(); + Statement stmt = conn.createStatement(); stmt.execute("create database if not exists "+dbName); stmt.close(); - hs2Conn.close(); + conn.close(); - hs2Conn = getConnection(jdbcUri+dbName,System.getProperty("user.name"),"bar"); - stmt = hs2Conn .createStatement(); + conn = getConnection(jdbcUri+dbName,System.getProperty("user.name"),"bar"); + stmt = conn .createStatement(); boolean expected = stmt.execute(" create table "+tableInNonDefaultSchema +" (x int)"); stmt.close(); - hs2Conn .close(); + conn .close(); - hs2Conn = getConnection(jdbcUri+dbName,System.getProperty("user.name"),"bar"); - stmt = hs2Conn .createStatement(); + conn = getConnection(jdbcUri+dbName,System.getProperty("user.name"),"bar"); + stmt = conn .createStatement(); ResultSet res = stmt.executeQuery("show tables"); boolean testTableExists = false; while (res.next()) { @@ -358,10 +363,10 @@ public void testURIDatabaseName() throws Exception{ assertTrue("table name "+tableInNonDefaultSchema + " found in SHOW TABLES result set", testTableExists); stmt.close(); - hs2Conn .close(); + conn .close(); - hs2Conn = getConnection(jdbcUri+"default",System.getProperty("user.name"),"bar"); - stmt = hs2Conn .createStatement(); + conn = getConnection(jdbcUri+"default",System.getProperty("user.name"),"bar"); + stmt = conn .createStatement(); res = stmt.executeQuery("show tables"); testTableExists = false; while (res.next()) { @@ -374,20 +379,20 @@ public void testURIDatabaseName() throws Exception{ assertFalse("table name "+tableInNonDefaultSchema + " NOT found in SHOW TABLES result set", testTableExists); stmt.close(); - hs2Conn .close(); + conn .close(); - hs2Conn = getConnection(jdbcUri+dbName,System.getProperty("user.name"),"bar"); - stmt = hs2Conn .createStatement(); + conn = getConnection(jdbcUri+dbName,System.getProperty("user.name"),"bar"); + stmt = conn .createStatement(); stmt.execute("set hive.support.concurrency = false"); res = stmt.executeQuery("show tables"); stmt.execute(" drop table if exists table_in_non_default_schema"); expected = stmt.execute("DROP DATABASE "+ dbName); stmt.close(); - hs2Conn.close(); + conn.close(); - hs2Conn = getConnection(jdbcUri+"default",System.getProperty("user.name"),"bar"); - stmt = hs2Conn .createStatement(); + conn = getConnection(jdbcUri+"default",System.getProperty("user.name"),"bar"); + stmt = conn .createStatement(); res = stmt.executeQuery("show tables"); testTableExists = false; while (res.next()) { @@ -398,49 +403,17 @@ public void testURIDatabaseName() throws Exception{ } // test URI with no dbName - hs2Conn = getConnection(jdbcUri, System.getProperty("user.name"),"bar"); - verifyCurrentDB("default", hs2Conn); - hs2Conn.close(); - - hs2Conn = getConnection(jdbcUri + ";", System.getProperty("user.name"),"bar"); - verifyCurrentDB("default", hs2Conn); - hs2Conn.close(); - - hs2Conn = getConnection(jdbcUri + ";/foo=bar;foo1=bar1", System.getProperty("user.name"),"bar"); - verifyCurrentDB("default", hs2Conn); - hs2Conn.close(); - } - - @Test - public void testConnectionSchemaAPIs() throws Exception { - String db1 = "DB1"; - /** - * get/set Schema are new in JDK7 and not available in java.sql.Connection in JDK6. - * Hence the test uses HiveConnection object to call these methods so that test will run with older JDKs - */ - hs2Conn = getConnection(); - HiveConnection hiveConn = (HiveConnection)hs2Conn; - - assertEquals("default", hiveConn.getSchema()); - Statement stmt = hs2Conn.createStatement(); - stmt.execute("DROP DATABASE IF EXISTS " + db1 + " CASCADE"); - stmt.execute("CREATE DATABASE " + db1); - assertEquals("default", hiveConn.getSchema()); - - stmt.execute("USE " + db1); - assertEquals(db1, hiveConn.getSchema()); - - stmt.execute("USE default"); - assertEquals("default", hiveConn.getSchema()); + conn = getConnection(jdbcUri, System.getProperty("user.name"),"bar"); + verifyCurrentDB("default", conn); + conn.close(); - hiveConn.setSchema(db1); - assertEquals(db1, hiveConn.getSchema()); - hiveConn.setSchema("default"); - assertEquals("default", hiveConn.getSchema()); + conn = getConnection(jdbcUri + ";", System.getProperty("user.name"),"bar"); + verifyCurrentDB("default", conn); + conn.close(); - assertTrue(hiveConn.getCatalog().isEmpty()); - hiveConn.setCatalog("foo"); - assertTrue(hiveConn.getCatalog().isEmpty()); + conn = getConnection(jdbcUri + ";/foo=bar;foo1=bar1", System.getProperty("user.name"),"bar"); + verifyCurrentDB("default", conn); + conn.close(); } /** @@ -452,7 +425,6 @@ public void testConnectionSchemaAPIs() throws Exception { */ private void verifyCurrentDB(String expectedDbName, Connection hs2Conn) throws Exception { String verifyTab = "miniHS2DbVerificationTable"; - hs2Conn = getConnection(); Statement stmt = hs2Conn.createStatement(); stmt.execute("DROP TABLE IF EXISTS " + expectedDbName + "." + verifyTab); stmt.execute("CREATE TABLE " + expectedDbName + "." + verifyTab + "(id INT)"); @@ -461,6 +433,32 @@ private void verifyCurrentDB(String expectedDbName, Connection hs2Conn) throws E stmt.close(); } + @Test + public void testConnectionSchemaAPIs() throws Exception { + /** + * get/set Schema are new in JDK7 and not available in java.sql.Connection in JDK6. Hence the + * test uses HiveConnection object to call these methods so that test will run with older JDKs + */ + HiveConnection hiveConn = (HiveConnection) conDefault; + assertEquals(defaultDbName, hiveConn.getSchema()); + Statement stmt = conDefault.createStatement(); + + stmt.execute("USE " + testDbName); + assertEquals(testDbName, hiveConn.getSchema()); + + stmt.execute("USE " + defaultDbName); + assertEquals(defaultDbName, hiveConn.getSchema()); + + hiveConn.setSchema(defaultDbName); + assertEquals(defaultDbName, hiveConn.getSchema()); + hiveConn.setSchema(defaultDbName); + assertEquals(defaultDbName, hiveConn.getSchema()); + + assertTrue(hiveConn.getCatalog().isEmpty()); + hiveConn.setCatalog("foo"); + assertTrue(hiveConn.getCatalog().isEmpty()); + } + /** * This method tests whether while creating a new connection, the config * variables specified in the JDBC URI are properly set for the connection. @@ -470,31 +468,27 @@ private void verifyCurrentDB(String expectedDbName, Connection hs2Conn) throws E */ @Test public void testNewConnectionConfiguration() throws Exception { - // Set some conf parameters - String hiveConf = "hive.cli.print.header=true;hive.server2.async.exec.shutdown.timeout=20;" - + "hive.server2.async.exec.threads=30;hive.server2.thrift.max.worker.threads=15"; + String hiveConf = + "hive.cli.print.header=true;hive.server2.async.exec.shutdown.timeout=20;" + + "hive.server2.async.exec.threads=30;hive.server2.thrift.max.worker.threads=15"; // Set some conf vars String hiveVar = "stab=salesTable;icol=customerID"; String jdbcUri = miniHS2.getJdbcURL() + "?" + hiveConf + "#" + hiveVar; - // Open a new connection with these conf & vars Connection con1 = DriverManager.getConnection(jdbcUri); - - // Execute "set" command and retrieve values for the conf & vars specified - // above + // Execute "set" command and retrieve values for the conf & vars specified above // Assert values retrieved Statement stmt = con1.createStatement(); - // Verify that the property has been properly set while creating the // connection above verifyConfProperty(stmt, "hive.cli.print.header", "true"); verifyConfProperty(stmt, "hive.server2.async.exec.shutdown.timeout", "20"); verifyConfProperty(stmt, "hive.server2.async.exec.threads", "30"); - verifyConfProperty(stmt, "hive.server2.thrift.max.worker.threads", - "15"); + verifyConfProperty(stmt, "hive.server2.thrift.max.worker.threads", "15"); verifyConfProperty(stmt, "stab", "salesTable"); verifyConfProperty(stmt, "icol", "customerID"); + stmt.close(); con1.close(); } @@ -507,72 +501,37 @@ private void verifyConfProperty(Statement stmt, String property, } } - private void setSerializeInTasksInConf(HiveConf conf) { - conf.setBoolean("hive.server2.thrift.resultset.serialize.in.tasks", true); - conf.setInt("hive.server2.thrift.resultset.max.fetch.size", 1000); - } - - private void unsetSerializeInTasksInConf(HiveConf conf) { - conf.setBoolean("hive.server2.thrift.resultset.serialize.in.tasks", false); - conf.unset("hive.server2.thrift.resultset.max.fetch.size"); - } - @Test public void testMetadataQueriesWithSerializeThriftInTasks() throws Exception { - //stop HiveServer2 - if (miniHS2.isStarted()) { - miniHS2.stop(); - } - - HiveConf conf = new HiveConf(); - String userName; - setSerializeInTasksInConf(conf); - miniHS2 = new MiniHS2(conf); - Map confOverlay = new HashMap(); - miniHS2.start(confOverlay); - - userName = System.getProperty("user.name"); - hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); - Statement stmt = hs2Conn.createStatement(); - stmt.execute("drop table if exists testThriftSerializeShow"); - stmt.execute("create table testThriftSerializeShow (a int)"); + Statement stmt = conTestDb.createStatement(); + setSerializeInTasksInConf(stmt); ResultSet rs = stmt.executeQuery("show tables"); assertTrue(rs.next()); - stmt.execute("describe testThriftSerializeShow"); - stmt.execute("explain select a from testThriftSerializeShow"); - stmt.execute("drop table testThriftSerializeShow"); + stmt.execute("describe " + tableName); + stmt.execute("explain select * from " + tableName); + // Note: by closing stmt object, we are also reverting any session specific config changes. stmt.close(); } @Test public void testSelectThriftSerializeInTasks() throws Exception { - if (miniHS2.isStarted()) { - miniHS2.stop(); - } - - HiveConf conf = new HiveConf(); - String userName; - setSerializeInTasksInConf(conf); - conf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, false); - miniHS2 = new MiniHS2(conf); - Map confOverlay = new HashMap(); - miniHS2.start(confOverlay); - - userName = System.getProperty("user.name"); - hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); - Statement stmt = hs2Conn.createStatement(); - + Statement stmt = conTestDb.createStatement(); + setSerializeInTasksInConf(stmt); + stmt.execute("set hive.compute.query.using.stats=false"); stmt.execute("drop table if exists testSelectThriftOrders"); stmt.execute("drop table if exists testSelectThriftCustomers"); stmt.execute("create table testSelectThriftOrders (orderid int, orderdate string, customerid int)"); stmt.execute("create table testSelectThriftCustomers (customerid int, customername string, customercountry string)"); - stmt.execute("insert into testSelectThriftOrders values (1, '2015-09-09', 123), (2, '2015-10-10', 246), (3, '2015-11-11', 356)"); - stmt.execute("insert into testSelectThriftCustomers values (123, 'David', 'America'), (246, 'John', 'Canada'), (356, 'Mary', 'CostaRica')"); + stmt.execute("insert into testSelectThriftOrders values (1, '2015-09-09', 123), " + + "(2, '2015-10-10', 246), (3, '2015-11-11', 356)"); + stmt.execute("insert into testSelectThriftCustomers values (123, 'David', 'America'), " + + "(246, 'John', 'Canada'), (356, 'Mary', 'CostaRica')"); ResultSet countOrders = stmt.executeQuery("select count(*) from testSelectThriftOrders"); while (countOrders.next()) { - assertEquals(3, countOrders.getInt(1)); + assertEquals(3, countOrders.getInt(1)); } - ResultSet maxOrders = stmt.executeQuery("select max(customerid) from testSelectThriftCustomers"); + ResultSet maxOrders = + stmt.executeQuery("select max(customerid) from testSelectThriftCustomers"); while (maxOrders.next()) { assertEquals(356, maxOrders.getInt(1)); } @@ -583,29 +542,20 @@ public void testSelectThriftSerializeInTasks() throws Exception { @Test public void testJoinThriftSerializeInTasks() throws Exception { - //stop HiveServer2 - if (miniHS2.isStarted()) { - miniHS2.stop(); - } - HiveConf conf = new HiveConf(); - String userName; - - setSerializeInTasksInConf(conf); - - miniHS2 = new MiniHS2(conf); - Map confOverlay = new HashMap(); - miniHS2.start(confOverlay); - - userName = System.getProperty("user.name"); - hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); - Statement stmt = hs2Conn.createStatement(); + Statement stmt = conTestDb.createStatement(); + setSerializeInTasksInConf(stmt); stmt.execute("drop table if exists testThriftJoinOrders"); stmt.execute("drop table if exists testThriftJoinCustomers"); stmt.execute("create table testThriftJoinOrders (orderid int, orderdate string, customerid int)"); stmt.execute("create table testThriftJoinCustomers (customerid int, customername string, customercountry string)"); - stmt.execute("insert into testThriftJoinOrders values (1, '2015-09-09', 123), (2, '2015-10-10', 246), (3, '2015-11-11', 356)"); - stmt.execute("insert into testThriftJoinCustomers values (123, 'David', 'America'), (246, 'John', 'Canada'), (356, 'Mary', 'CostaRica')"); - ResultSet joinResultSet = stmt.executeQuery("select testThriftJoinOrders.orderid, testThriftJoinCustomers.customername from testThriftJoinOrders inner join testThriftJoinCustomers where testThriftJoinOrders.customerid=testThriftJoinCustomers.customerid"); + stmt.execute("insert into testThriftJoinOrders values (1, '2015-09-09', 123), (2, '2015-10-10', 246), " + + "(3, '2015-11-11', 356)"); + stmt.execute("insert into testThriftJoinCustomers values (123, 'David', 'America'), " + + "(246, 'John', 'Canada'), (356, 'Mary', 'CostaRica')"); + ResultSet joinResultSet = + stmt.executeQuery("select testThriftJoinOrders.orderid, testThriftJoinCustomers.customername " + + "from testThriftJoinOrders inner join testThriftJoinCustomers where " + + "testThriftJoinOrders.customerid=testThriftJoinCustomers.customerid"); Map expectedResult = new HashMap(); expectedResult.put(1, "David"); expectedResult.put(2, "John"); @@ -621,28 +571,16 @@ public void testJoinThriftSerializeInTasks() throws Exception { @Test public void testEmptyResultsetThriftSerializeInTasks() throws Exception { - //stop HiveServer2 - if (miniHS2.isStarted()) { - miniHS2.stop(); - } - - HiveConf conf = new HiveConf(); - String userName; - setSerializeInTasksInConf(conf); - miniHS2 = new MiniHS2(conf); - Map confOverlay = new HashMap(); - miniHS2.start(confOverlay); - - userName = System.getProperty("user.name"); - hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); - Statement stmt = hs2Conn.createStatement(); + Statement stmt = conTestDb.createStatement(); + setSerializeInTasksInConf(stmt); stmt.execute("drop table if exists testThriftSerializeShow1"); stmt.execute("drop table if exists testThriftSerializeShow2"); stmt.execute("create table testThriftSerializeShow1 (a int)"); stmt.execute("create table testThriftSerializeShow2 (b int)"); stmt.execute("insert into testThriftSerializeShow1 values (1)"); stmt.execute("insert into testThriftSerializeShow2 values (2)"); - ResultSet rs = stmt.executeQuery("select * from testThriftSerializeShow1 inner join testThriftSerializeShow2 where testThriftSerializeShow1.a=testThriftSerializeShow2.b"); + ResultSet rs = stmt.executeQuery("select * from testThriftSerializeShow1 inner join " + + "testThriftSerializeShow2 where testThriftSerializeShow1.a=testThriftSerializeShow2.b"); assertTrue(!rs.next()); stmt.execute("drop table testThriftSerializeShow1"); stmt.execute("drop table testThriftSerializeShow2"); @@ -651,72 +589,63 @@ public void testEmptyResultsetThriftSerializeInTasks() throws Exception { @Test public void testFloatCast2DoubleThriftSerializeInTasks() throws Exception { - //stop HiveServer2 - if (miniHS2.isStarted()) { - miniHS2.stop(); - } - - HiveConf conf = new HiveConf(); - String userName; - setSerializeInTasksInConf(conf); - miniHS2 = new MiniHS2(conf); - Map confOverlay = new HashMap(); - miniHS2.start(confOverlay); - - userName = System.getProperty("user.name"); - hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); - Statement stmt = hs2Conn.createStatement(); + Statement stmt = conTestDb.createStatement(); + setSerializeInTasksInConf(stmt); stmt.execute("drop table if exists testThriftSerializeShow1"); stmt.execute("drop table if exists testThriftSerializeShow2"); stmt.execute("create table testThriftSerializeShow1 (a float)"); stmt.execute("create table testThriftSerializeShow2 (b double)"); stmt.execute("insert into testThriftSerializeShow1 values (1.1), (2.2), (3.3)"); stmt.execute("insert into testThriftSerializeShow2 values (2.2), (3.3), (4.4)"); - ResultSet rs = stmt.executeQuery("select * from testThriftSerializeShow1 inner join testThriftSerializeShow2 where testThriftSerializeShow1.a=testThriftSerializeShow2.b"); + ResultSet rs = + stmt.executeQuery("select * from testThriftSerializeShow1 inner join " + + "testThriftSerializeShow2 where testThriftSerializeShow1.a=testThriftSerializeShow2.b"); assertTrue(!rs.next()); stmt.execute("drop table testThriftSerializeShow1"); stmt.execute("drop table testThriftSerializeShow2"); stmt.close(); } - @Test - public void testEnableThriftSerializeInTasks() throws Exception { - //stop HiveServer2 - if (miniHS2.isStarted()) { - miniHS2.stop(); - } - - HiveConf conf = new HiveConf(); - String userName; - setSerializeInTasksInConf(conf); - miniHS2 = new MiniHS2(conf); - Map confOverlay = new HashMap(); - miniHS2.start(confOverlay); - - userName = System.getProperty("user.name"); - hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); - Statement stmt = hs2Conn.createStatement(); - stmt.execute("drop table if exists testThriftSerializeShow1"); - stmt.execute("drop table if exists testThriftSerializeShow2"); - stmt.execute("create table testThriftSerializeShow1 (a int)"); - stmt.execute("create table testThriftSerializeShow2 (b int)"); - stmt.execute("insert into testThriftSerializeShow1 values (1)"); - stmt.execute("insert into testThriftSerializeShow2 values (2)"); - ResultSet rs = stmt.executeQuery("select * from testThriftSerializeShow1 inner join testThriftSerializeShow2 where testThriftSerializeShow1.a=testThriftSerializeShow2.b"); - assertTrue(!rs.next()); - - unsetSerializeInTasksInConf(conf); - rs = stmt.executeQuery("select * from testThriftSerializeShow1 inner join testThriftSerializeShow2 where testThriftSerializeShow1.a=testThriftSerializeShow2.b"); - assertTrue(!rs.next()); - - setSerializeInTasksInConf(conf); - rs = stmt.executeQuery("select * from testThriftSerializeShow1 inner join testThriftSerializeShow2 where testThriftSerializeShow1.a=testThriftSerializeShow2.b"); - assertTrue(!rs.next()); - - stmt.execute("drop table testThriftSerializeShow1"); - stmt.execute("drop table testThriftSerializeShow2"); - stmt.close(); - } + @Test + public void testEnableThriftSerializeInTasks() throws Exception { + Statement stmt = conTestDb.createStatement(); + stmt.execute("drop table if exists testThriftSerializeShow1"); + stmt.execute("drop table if exists testThriftSerializeShow2"); + stmt.execute("create table testThriftSerializeShow1 (a int)"); + stmt.execute("create table testThriftSerializeShow2 (b int)"); + stmt.execute("insert into testThriftSerializeShow1 values (1)"); + stmt.execute("insert into testThriftSerializeShow2 values (2)"); + ResultSet rs = + stmt.executeQuery("select * from testThriftSerializeShow1 inner join " + + "testThriftSerializeShow2 where testThriftSerializeShow1.a=testThriftSerializeShow2.b"); + assertTrue(!rs.next()); + + unsetSerializeInTasksInConf(stmt); + rs = + stmt.executeQuery("select * from testThriftSerializeShow1 inner join " + + "testThriftSerializeShow2 where testThriftSerializeShow1.a=testThriftSerializeShow2.b"); + assertTrue(!rs.next()); + + setSerializeInTasksInConf(stmt); + rs = + stmt.executeQuery("select * from testThriftSerializeShow1 inner join " + + "testThriftSerializeShow2 where testThriftSerializeShow1.a=testThriftSerializeShow2.b"); + assertTrue(!rs.next()); + + stmt.execute("drop table testThriftSerializeShow1"); + stmt.execute("drop table testThriftSerializeShow2"); + stmt.close(); + } + + private void setSerializeInTasksInConf(Statement stmt) throws SQLException { + stmt.execute("set hive.server2.thrift.resultset.serialize.in.tasks=true"); + stmt.execute("set hive.server2.thrift.resultset.max.fetch.size=1000"); + } + + private void unsetSerializeInTasksInConf(Statement stmt) throws SQLException { + stmt.execute("set hive.server2.thrift.resultset.serialize.in.tasks=false"); + stmt.execute("set hive.server2.thrift.resultset.max.fetch.size"); + } /** * Tests the creation of the 3 scratch dirs: hdfs, local, downloaded resources (which is also local). @@ -727,25 +656,21 @@ public void testEnableThriftSerializeInTasks() throws Exception { @Test public void testSessionScratchDirs() throws Exception { // Stop HiveServer2 - if (miniHS2.isStarted()) { - miniHS2.stop(); - } + stopMiniHS2(); HiveConf conf = new HiveConf(); String userName; Path scratchDirPath; - // 1. Test with doAs=false - conf.setBoolean("hive.server2.enable.doAs", false); // Set a custom prefix for hdfs scratch dir path conf.set("hive.exec.scratchdir", tmpDir + "/hs2"); // Set a scratch dir permission String fsPermissionStr = "700"; conf.set("hive.scratch.dir.permission", fsPermissionStr); // Start an instance of HiveServer2 which uses miniMR - miniHS2 = new MiniHS2(conf); - Map confOverlay = new HashMap(); - miniHS2.start(confOverlay); + startMiniHS2(conf); + // 1. Test with doAs=false + String sessionConf="hive.server2.enable.doAs=false"; userName = System.getProperty("user.name"); - hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); + Connection conn = getConnection(miniHS2.getJdbcURL(testDbName, sessionConf), userName, "password"); // FS FileSystem fs = miniHS2.getLocalFS(); FsPermission expectedFSPermission = new FsPermission(HiveConf.getVar(conf, @@ -763,20 +688,13 @@ public void testSessionScratchDirs() throws Exception { // Downloaded resources dir scratchDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR)); verifyScratchDir(conf, fs, scratchDirPath, expectedFSPermission, userName, true); - hs2Conn.close(); + conn.close(); // 2. Test with doAs=true - // Restart HiveServer2 with doAs=true - if (miniHS2.isStarted()) { - miniHS2.stop(); - } - conf.setBoolean("hive.server2.enable.doAs", true); - // Start HS2 - miniHS2 = new MiniHS2(conf); - miniHS2.start(confOverlay); + sessionConf="hive.server2.enable.doAs=true"; // Test for user "neo" userName = "neo"; - hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "the-one"); + conn = getConnection(miniHS2.getJdbcURL(testDbName, sessionConf), userName, "the-one"); // Verify scratch dir paths and permission // HDFS scratch dir @@ -790,45 +708,32 @@ public void testSessionScratchDirs() throws Exception { // Downloaded resources dir scratchDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR)); verifyScratchDir(conf, fs, scratchDirPath, expectedFSPermission, userName, true); - hs2Conn.close(); - - // Test for user "trinity" - userName = "trinity"; - hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "the-one"); - - // Verify scratch dir paths and permission - // HDFS scratch dir - scratchDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR) + "/" + userName); - verifyScratchDir(conf, fs, scratchDirPath, expectedFSPermission, userName, false); - - // Local scratch dir - scratchDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR)); - verifyScratchDir(conf, fs, scratchDirPath, expectedFSPermission, userName, true); + conn.close(); - // Downloaded resources dir - scratchDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR)); - verifyScratchDir(conf, fs, scratchDirPath, expectedFSPermission, userName, true); + // Restore original state + restoreMiniHS2AndConnections(); } - /** Test UDF whitelist - * - verify default value - * - verify udf allowed with default whitelist - * - verify udf allowed with specific whitelist - * - verify udf disallowed when not in whitelist + + /** + * Test UDF whitelist + * - verify default value + * - verify udf allowed with default whitelist + * - verify udf allowed with specific whitelist + * - verify udf disallowed when not in whitelist * @throws Exception */ @Test - public void testUdfWhiteList() throws Exception { + public void testUdfWhiteBlackList() throws Exception { HiveConf testConf = new HiveConf(); assertTrue(testConf.getVar(ConfVars.HIVE_SERVER2_BUILTIN_UDF_WHITELIST).isEmpty()); // verify that udf in default whitelist can be executed - hs2Conn = getConnection(); - Statement stmt = hs2Conn.createStatement(); + Statement stmt = conDefault.createStatement(); stmt.executeQuery("SELECT substr('foobar', 4) "); - hs2Conn.close(); - miniHS2.stop(); + stmt.close(); // setup whitelist + stopMiniHS2(); Set funcNames = FunctionRegistry.getFunctionNames(); funcNames.remove("reflect"); String funcNameStr = ""; @@ -837,14 +742,12 @@ public void testUdfWhiteList() throws Exception { } funcNameStr = funcNameStr.substring(1); // remove ',' at begining testConf.setVar(ConfVars.HIVE_SERVER2_BUILTIN_UDF_WHITELIST, funcNameStr); - miniHS2 = new MiniHS2(testConf); - miniHS2.start(new HashMap()); - - hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); - stmt = hs2Conn.createStatement(); + startMiniHS2(testConf); + Connection conn = + getConnection(miniHS2.getJdbcURL(testDbName), System.getProperty("user.name"), "bar"); + stmt = conn.createStatement(); // verify that udf in whitelist can be executed stmt.executeQuery("SELECT substr('foobar', 3) "); - // verify that udf not in whitelist fails try { stmt.executeQuery("SELECT reflect('java.lang.String', 'valueOf', 1) "); @@ -852,6 +755,10 @@ public void testUdfWhiteList() throws Exception { } catch (SQLException e) { // expected } + conn.close(); + + // Restore original state + restoreMiniHS2AndConnections(); } /** Test UDF blacklist @@ -864,18 +771,16 @@ public void testUdfWhiteList() throws Exception { public void testUdfBlackList() throws Exception { HiveConf testConf = new HiveConf(); assertTrue(testConf.getVar(ConfVars.HIVE_SERVER2_BUILTIN_UDF_BLACKLIST).isEmpty()); - hs2Conn = getConnection(); - Statement stmt = hs2Conn.createStatement(); + Statement stmt = conDefault.createStatement(); // verify that udf in default whitelist can be executed stmt.executeQuery("SELECT substr('foobar', 4) "); - hs2Conn.close(); - miniHS2.stop(); + stopMiniHS2(); testConf.setVar(ConfVars.HIVE_SERVER2_BUILTIN_UDF_BLACKLIST, "reflect"); - miniHS2 = new MiniHS2(testConf); - miniHS2.start(new HashMap()); - hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); - stmt = hs2Conn.createStatement(); + startMiniHS2(testConf); + Connection conn = + getConnection(miniHS2.getJdbcURL(testDbName), System.getProperty("user.name"), "bar"); + stmt = conn.createStatement(); try { stmt.executeQuery("SELECT reflect('java.lang.String', 'valueOf', 1) "); @@ -883,6 +788,9 @@ public void testUdfBlackList() throws Exception { } catch (SQLException e) { // expected } + conn.close(); + // Restore original state + restoreMiniHS2AndConnections(); } /** Test UDF blacklist overrides whitelist @@ -890,9 +798,7 @@ public void testUdfBlackList() throws Exception { */ @Test public void testUdfBlackListOverride() throws Exception { - if (miniHS2.isStarted()) { - miniHS2.stop(); - } + stopMiniHS2(); // setup whitelist HiveConf testConf = new HiveConf(); @@ -904,11 +810,10 @@ public void testUdfBlackListOverride() throws Exception { funcNameStr = funcNameStr.substring(1); // remove ',' at begining testConf.setVar(ConfVars.HIVE_SERVER2_BUILTIN_UDF_WHITELIST, funcNameStr); testConf.setVar(ConfVars.HIVE_SERVER2_BUILTIN_UDF_BLACKLIST, "reflect"); - miniHS2 = new MiniHS2(testConf); - miniHS2.start(new HashMap()); - - hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); - Statement stmt = hs2Conn.createStatement(); + startMiniHS2(testConf); + Connection conn = + getConnection(miniHS2.getJdbcURL(testDbName), System.getProperty("user.name"), "bar"); + Statement stmt = conn.createStatement(); // verify that udf in black list fails even though it's included in whitelist try { @@ -917,6 +822,9 @@ public void testUdfBlackListOverride() throws Exception { } catch (SQLException e) { // expected } + conn.close(); + // Restore original state + restoreMiniHS2AndConnections(); } /** @@ -927,19 +835,15 @@ public void testUdfBlackListOverride() throws Exception { @Test public void testRootScratchDir() throws Exception { // Stop HiveServer2 - if (miniHS2.isStarted()) { - miniHS2.stop(); - } + stopMiniHS2(); HiveConf conf = new HiveConf(); String userName; Path scratchDirPath; conf.set("hive.exec.scratchdir", tmpDir + "/hs2"); // Start an instance of HiveServer2 which uses miniMR - miniHS2 = new MiniHS2(conf); - Map confOverlay = new HashMap(); - miniHS2.start(confOverlay); + startMiniHS2(conf); userName = System.getProperty("user.name"); - hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); + Connection conn = getConnection(miniHS2.getJdbcURL(testDbName), userName, "password"); // FS FileSystem fs = miniHS2.getLocalFS(); FsPermission expectedFSPermission = new FsPermission((short)00733); @@ -947,19 +851,20 @@ public void testRootScratchDir() throws Exception { // HDFS scratch dir scratchDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR)); verifyScratchDir(conf, fs, scratchDirPath, expectedFSPermission, userName, false); - hs2Conn.close(); + conn.close(); // Test with multi-level scratch dir path // Stop HiveServer2 - if (miniHS2.isStarted()) { - miniHS2.stop(); - } + stopMiniHS2(); conf.set("hive.exec.scratchdir", tmpDir + "/level1/level2/level3"); - miniHS2 = new MiniHS2(conf); - miniHS2.start(confOverlay); - hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); + startMiniHS2(conf); + conn = getConnection(miniHS2.getJdbcURL(testDbName), userName, "password"); scratchDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR)); verifyScratchDir(conf, fs, scratchDirPath, expectedFSPermission, userName, false); + conn.close(); + + // Restore original state + restoreMiniHS2AndConnections(); } private void verifyScratchDir(HiveConf conf, FileSystem fs, Path scratchDirPath, @@ -980,27 +885,24 @@ private void verifyScratchDir(HiveConf conf, FileSystem fs, Path scratchDirPath, @Test public void testHttpHeaderSize() throws Exception { // Stop HiveServer2 - if (miniHS2.isStarted()) { - miniHS2.stop(); - } + stopMiniHS2(); HiveConf conf = new HiveConf(); conf.set("hive.server2.transport.mode", "http"); conf.setInt("hive.server2.thrift.http.request.header.size", 1024); conf.setInt("hive.server2.thrift.http.response.header.size", 1024); - miniHS2 = new MiniHS2(conf); - Map confOverlay = new HashMap(); - miniHS2.start(confOverlay); + startMiniHS2(conf); // Username is added to the request header String userName = StringUtils.leftPad("*", 100); + Connection conn = null; // This should go fine, since header should be less than the configured header size try { - hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); + conn = getConnection(miniHS2.getJdbcURL(testDbName), userName, "password"); } catch (Exception e) { fail("Not expecting exception: " + e); } finally { - if (hs2Conn != null) { - hs2Conn.close(); + if (conn != null) { + conn.close(); } } @@ -1008,35 +910,35 @@ public void testHttpHeaderSize() throws Exception { // than the configured the header size userName = StringUtils.leftPad("*", 2000); try { - hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); + conn = getConnection(miniHS2.getJdbcURL(testDbName), userName, "password"); } catch (Exception e) { assertTrue("Header exception thrown", e != null); assertTrue(e.getMessage().contains("HTTP Response code: 413")); } finally { - if (hs2Conn != null) { - hs2Conn.close(); + if (conn != null) { + conn.close(); } } // Stop HiveServer2 to increase header size - if (miniHS2.isStarted()) { - miniHS2.stop(); - } + stopMiniHS2(); conf.setInt("hive.server2.thrift.http.request.header.size", 3000); conf.setInt("hive.server2.thrift.http.response.header.size", 3000); - miniHS2 = new MiniHS2(conf); - miniHS2.start(confOverlay); + startMiniHS2(conf); // This should now go fine, since we increased the configured header size try { - hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); + conn = getConnection(miniHS2.getJdbcURL(testDbName), userName, "password"); } catch (Exception e) { fail("Not expecting exception: " + e); } finally { - if (hs2Conn != null) { - hs2Conn.close(); + if (conn != null) { + conn.close(); } } + + // Restore original state + restoreMiniHS2AndConnections(); } /** @@ -1047,34 +949,28 @@ public void testHttpHeaderSize() throws Exception { */ @Test public void testAddJarDataNucleusUnCaching() throws Exception { - Path jarFilePath = new Path(dataFileDir, "identity_udf.jar"); - Connection conn = getConnection(miniHS2.getJdbcURL(), "foo", "bar"); - String tableName = "testAddJar"; + Path jarFilePath = getHiveContribJarPath(); + // We need a new connection object as we'll check the cache size after connection close + Connection conn = + getConnection(miniHS2.getJdbcURL(testDbName), System.getProperty("user.name"), "password"); Statement stmt = conn.createStatement(); - stmt.execute("SET hive.support.concurrency = false"); - // Create table - stmt.execute("DROP TABLE IF EXISTS " + tableName); - stmt.execute("CREATE TABLE " + tableName + " (key INT, value STRING)"); - // Load data - stmt.execute("LOAD DATA LOCAL INPATH '" + kvDataFilePath.toString() + "' INTO TABLE " - + tableName); - ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName); - // Ensure table is populated - assertTrue(res.next()); int mapSizeBeforeClose; int mapSizeAfterClose; // Add the jar file stmt.execute("ADD JAR " + jarFilePath.toString()); // Create a temporary function using the jar - stmt.execute("CREATE TEMPORARY FUNCTION func AS 'IdentityStringUDF'"); + stmt.execute("CREATE TEMPORARY FUNCTION add_func AS '" + testUdfClassName + "'"); + ResultSet res = stmt.executeQuery("DESCRIBE FUNCTION add_func"); + checkForNotExist(res); // Execute the UDF - stmt.execute("SELECT func(value) from " + tableName); + stmt.execute("SELECT add_func(int_col, 1) from " + tableName + " limit 1"); mapSizeBeforeClose = getNucleusClassLoaderResolverMapSize(); System.out .println("classLoaderResolverMap size before connection close: " + mapSizeBeforeClose); // Cache size should be > 0 now Assert.assertTrue(mapSizeBeforeClose > 0); + // Close the connection conn.close(); mapSizeAfterClose = getNucleusClassLoaderResolverMapSize(); System.out.println("classLoaderResolverMap size after connection close: " + mapSizeAfterClose); @@ -1129,34 +1025,7 @@ public void testAddJarConstructorUnCaching() throws Exception { // This test assumes the hive-contrib JAR has been built as part of the Hive build. // Also dependent on the UDFExampleAdd class within that JAR. setReflectionUtilCache(); - String udfClassName = "org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd"; - String mvnRepo = System.getProperty("maven.local.repository"); - String hiveVersion = System.getProperty("hive.version"); - String jarFileName = "hive-contrib-" + hiveVersion + ".jar"; - String[] pathParts = { - "org", "apache", "hive", - "hive-contrib", hiveVersion, jarFileName - }; - - // Create path to hive-contrib JAR on local filesystem - Path jarFilePath = new Path(mvnRepo); - for (String pathPart : pathParts) { - jarFilePath = new Path(jarFilePath, pathPart); - } - - Connection conn = getConnection(miniHS2.getJdbcURL(), "foo", "bar"); - String tableName = "testAddJar"; - Statement stmt = conn.createStatement(); - stmt.execute("SET hive.support.concurrency = false"); - // Create table - stmt.execute("DROP TABLE IF EXISTS " + tableName); - stmt.execute("CREATE TABLE " + tableName + " (key INT, value STRING)"); - // Load data - stmt.execute("LOAD DATA LOCAL INPATH '" + kvDataFilePath.toString() + "' INTO TABLE " - + tableName); - ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName); - // Ensure table is populated - assertTrue(res.next()); + Path jarFilePath = getHiveContribJarPath(); long cacheBeforeAddJar, cacheAfterAddJar, cacheAfterClose; // Force the cache clear so we know its empty @@ -1168,20 +1037,20 @@ public void testAddJarConstructorUnCaching() throws Exception { cacheBeforeAddJar == 0); // Add the jar file + Statement stmt = conTestDb.createStatement(); stmt.execute("ADD JAR " + jarFilePath.toString()); // Create a temporary function using the jar - stmt.execute("CREATE TEMPORARY FUNCTION func AS '" + udfClassName + "'"); + stmt.execute("CREATE TEMPORARY FUNCTION add_func AS '" + testUdfClassName + "'"); // Execute the UDF - res = stmt.executeQuery("SELECT func(value) from " + tableName); + ResultSet res = stmt.executeQuery("SELECT add_func(int_col, 1) from " + tableName + " limit 1"); assertTrue(res.next()); // Check to make sure the cache is now being used cacheAfterAddJar = getReflectionUtilCacheSize(); System.out.println("CONSTRUCTOR_CACHE size after add jar: " + cacheAfterAddJar); - Assert.assertTrue("FAILED: CONSTRUCTOR_CACHE size after connection close: " + cacheAfterAddJar, + Assert.assertTrue("FAILED: CONSTRUCTOR_CACHE size after add jar: " + cacheAfterAddJar, cacheAfterAddJar > 0); - conn.close(); - TimeUnit.SECONDS.sleep(10); + TimeUnit.SECONDS.sleep(7); // Have to force a cleanup of all expired entries here because its possible that the // expired entries will still be counted in Cache.size(). // Taken from: @@ -1191,6 +1060,8 @@ public void testAddJarConstructorUnCaching() throws Exception { System.out.println("CONSTRUCTOR_CACHE size after connection close: " + cacheAfterClose); Assert.assertTrue("FAILED: CONSTRUCTOR_CACHE size after connection close: " + cacheAfterClose, cacheAfterClose == 0); + stmt.execute("DROP TEMPORARY FUNCTION IF EXISTS add_func"); + stmt.close(); } private void setReflectionUtilCache() { @@ -1202,8 +1073,11 @@ private void setReflectionUtilCache() { constructorCacheField.setAccessible(true); Field modifiersField = Field.class.getDeclaredField("modifiers"); modifiersField.setAccessible(true); - modifiersField.setInt(constructorCacheField, constructorCacheField.getModifiers() & ~Modifier.FINAL); - tmp = CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.SECONDS).concurrencyLevel(64).weakKeys().weakValues().build(); + modifiersField.setInt(constructorCacheField, constructorCacheField.getModifiers() + & ~Modifier.FINAL); + tmp = + CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.SECONDS).concurrencyLevel(64) + .weakKeys().weakValues().build(); constructorCacheField.set(tmp.getClass(), tmp); } } catch (Exception e) { @@ -1227,10 +1101,10 @@ private Cache getReflectionUtilCache() { private void invalidateReflectionUtlCache() { try { - Cache constructorCache = getReflectionUtilCache(); - if ( constructorCache != null ) { - constructorCache.invalidateAll(); - } + Cache constructorCache = getReflectionUtilCache(); + if (constructorCache != null) { + constructorCache.invalidateAll(); + } } catch (Exception e) { System.out.println("Error when trying to invalidate the cache: " + e); } @@ -1239,7 +1113,7 @@ private void invalidateReflectionUtlCache() { private void cleanUpReflectionUtlCache() { try { Cache constructorCache = getReflectionUtilCache(); - if ( constructorCache != null ) { + if (constructorCache != null) { constructorCache.cleanUp(); } } catch (Exception e) { @@ -1249,13 +1123,159 @@ private void cleanUpReflectionUtlCache() { private long getReflectionUtilCacheSize() { try { - Cache constructorCache = getReflectionUtilCache(); - if ( constructorCache != null ) { - return constructorCache.size(); - } + Cache constructorCache = getReflectionUtilCache(); + if (constructorCache != null) { + return constructorCache.size(); + } } catch (Exception e) { System.out.println(e); } return -1; } + + @Test + public void testPermFunc() throws Exception { + // This test assumes the hive-contrib JAR has been built as part of the Hive build. + // Also dependent on the UDFExampleAdd class within that JAR. + Path jarFilePath = getHiveContribJarPath(); + Statement stmt = conTestDb.createStatement(); + ResultSet res; + // Add the jar file + stmt.execute("ADD JAR " + jarFilePath.toString()); + + // Register function + String queryStr = + "CREATE FUNCTION example_add AS '" + testUdfClassName + "' USING JAR '" + jarFilePath + "'"; + stmt.execute(queryStr); + + // Call describe + res = stmt.executeQuery("DESCRIBE FUNCTION " + testDbName + ".example_add"); + checkForNotExist(res); + + // Use UDF in query + res = stmt.executeQuery("SELECT example_add(1, 2) FROM " + tableName + " LIMIT 1"); + assertTrue("query has results", res.next()); + assertEquals(3, res.getInt(1)); + assertFalse("no more results", res.next()); + + // A new connection should be able to call describe/use function without issue + Connection conn2 = getConnection(testDbName); + Statement stmt2 = conn2.createStatement(); + stmt2.execute("USE " + testDbName); + res = stmt2.executeQuery("DESCRIBE FUNCTION " + testDbName + ".example_add"); + checkForNotExist(res); + + res = + stmt2.executeQuery("SELECT " + testDbName + ".example_add(1, 1) FROM " + tableName + + " LIMIT 1"); + assertTrue("query has results", res.next()); + assertEquals(2, res.getInt(1)); + assertFalse("no more results", res.next()); + conn2.close(); + stmt.execute("DROP FUNCTION IF EXISTS " + testDbName + ".example_add"); + stmt.close(); + } + + private Path getHiveContribJarPath() { + String mvnRepo = System.getProperty("maven.local.repository"); + String hiveVersion = System.getProperty("hive.version"); + String jarFileName = "hive-contrib-" + hiveVersion + ".jar"; + String[] pathParts = { + "org", "apache", "hive", + "hive-contrib", hiveVersion, jarFileName + }; + + // Create path to hive-contrib JAR on local filesystem + Path jarFilePath = new Path(mvnRepo); + for (String pathPart : pathParts) { + jarFilePath = new Path(jarFilePath, pathPart); + } + return jarFilePath; + } + + @Test + public void testTempTable() throws Exception { + // Create temp table with current connection + String tempTableName = "tmp1"; + Statement stmt = conTestDb.createStatement(); + stmt.execute("CREATE TEMPORARY TABLE " + tempTableName + " (key string, value string)"); + stmt.execute("load data local inpath '" + kvDataFilePath.toString() + "' into table " + + tempTableName); + + String resultVal = "val_238"; + String queryStr = "SELECT * FROM " + tempTableName + " where value = '" + resultVal + "'"; + + ResultSet res = stmt.executeQuery(queryStr); + assertTrue(res.next()); + assertEquals(resultVal, res.getString(2)); + res.close(); + stmt.close(); + + // Test getTables() + DatabaseMetaData md = conTestDb.getMetaData(); + assertTrue(md.getConnection() == conTestDb); + + ResultSet rs = md.getTables(null, null, tempTableName, null); + boolean foundTable = false; + while (rs.next()) { + String tableName = rs.getString(3); + if (tableName.equalsIgnoreCase(tempTableName)) { + assertFalse("Table not found yet", foundTable); + foundTable = true; + } + } + assertTrue("Found temp table", foundTable); + + // Test getTables() with no table name pattern + rs = md.getTables(null, null, null, null); + foundTable = false; + while (rs.next()) { + String tableName = rs.getString(3); + if (tableName.equalsIgnoreCase(tempTableName)) { + assertFalse("Table not found yet", foundTable); + foundTable = true; + } + } + assertTrue("Found temp table", foundTable); + + // Test getColumns() + rs = md.getColumns(null, null, tempTableName, null); + assertTrue("First row", rs.next()); + assertTrue(rs.getString(3).equalsIgnoreCase(tempTableName)); + assertTrue(rs.getString(4).equalsIgnoreCase("key")); + assertEquals(Types.VARCHAR, rs.getInt(5)); + + assertTrue("Second row", rs.next()); + assertTrue(rs.getString(3).equalsIgnoreCase(tempTableName)); + assertTrue(rs.getString(4).equalsIgnoreCase("value")); + assertEquals(Types.VARCHAR, rs.getInt(5)); + + // A second connection should not be able to see the table + Connection conn2 = + DriverManager.getConnection(miniHS2.getJdbcURL(testDbName), + System.getProperty("user.name"), "bar"); + Statement stmt2 = conn2.createStatement(); + stmt2.execute("USE " + testDbName); + boolean gotException = false; + try { + res = stmt2.executeQuery(queryStr); + } catch (SQLException err) { + // This is expected to fail. + assertTrue("Expecting table not found error, instead got: " + err, + err.getMessage().contains("Table not found")); + gotException = true; + } + assertTrue("Exception while querying non-existing temp table", gotException); + conn2.close(); + } + + private void checkForNotExist(ResultSet res) throws Exception { + int numRows = 0; + while (res.next()) { + numRows++; + String strVal = res.getString(1); + assertEquals("Should not find 'not exist'", -1, strVal.toLowerCase().indexOf("not exist")); + } + assertTrue("Rows returned from describe function", numRows > 0); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index eeba6cd..60318f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -1020,9 +1020,12 @@ public void closeOp(boolean abort) throws HiveException { lastProgressReport = System.currentTimeMillis(); if (!abort) { - // If serializer is ThriftJDBCBinarySerDe, then it buffers rows to a certain limit (hive.server2.thrift.resultset.max.fetch.size) - // and serializes the whole batch when the buffer is full. The serialize returns null if the buffer is not full - // (the size of buffer is kept track of in the ThriftJDBCBinarySerDe). + /** + * If serializer is ThriftJDBCBinarySerDe, then it buffers rows to a certain limit + * (hive.server2.thrift.resultset.max.fetch.size) and serializes the whole batch when the + * buffer is full. The serialize returns null if the buffer is not full (the size of buffer is + * kept track of in the ThriftJDBCBinarySerDe). + */ if (conf.isHiveServerQuery() && HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS) && serializer.getClass().getName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 747f387..e5b2f82 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -1786,7 +1786,7 @@ private void handleInsertStatementSpecPhase1(ASTNode ast, QBParseInfo qbp, Phase public void getMaterializationMetadata(QB qb) throws SemanticException { try { gatherCTEReferences(qb, rootClause); - int threshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_CTE_MATERIALIZE_THRESHOLD); + int threshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_CTE_MATERIALIZE_THRESHOLD); for (CTEClause cte : Sets.newHashSet(aliasToCTEs.values())) { if (threshold >= 0 && cte.reference >= threshold) { cte.materialize = true; @@ -2495,7 +2495,7 @@ void parseJoinCondPopulateAlias(QBJoinTree joinTree, ASTNode condn, case HiveParser.TOK_CHARSETLITERAL: case HiveParser.KW_TRUE: case HiveParser.KW_FALSE: - break; + break; case HiveParser.TOK_FUNCTION: // check all the arguments @@ -6884,19 +6884,20 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } else if (qb.getIsQuery()) { String fileFormat; if (SessionState.get().getIsUsingThriftJDBCBinarySerDe()) { - fileFormat = "SequenceFile"; - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, fileFormat); - table_desc= - PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, - ThriftJDBCBinarySerDe.class); - // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll - // write out formatted thrift objects to SequenceFile - conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName()); + LOG.debug("Final results will be written using: " + ThriftJDBCBinarySerDe.class); + fileFormat = "SequenceFile"; + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, fileFormat); + table_desc = + PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, + ThriftJDBCBinarySerDe.class); + // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll + // write out formatted thrift objects to SequenceFile + conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName()); } else { - fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); - table_desc = - PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, - LazySimpleSerDe.class); + fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); + table_desc = + PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, + LazySimpleSerDe.class); } } else { table_desc = PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes); @@ -6916,7 +6917,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx); - + inputRR = opParseCtx.get(input).getRowResolver(); ArrayList vecCol = new ArrayList(); @@ -10589,7 +10590,7 @@ private void walkASTMarkTABREF(ASTNode ast, Set cteAlias) colNames.add(col.getName()); colTypes.add(col.getType()); } - + basicInfos.put(new HivePrivilegeObject(table.getDbName(), table.getTableName(), colNames), new MaskAndFilterInfo(colTypes, additionalTabInfo.toString(), alias, astNode, table.isView())); } @@ -10614,7 +10615,7 @@ private void walkASTMarkTABREF(ASTNode ast, Set cteAlias) } } } - + // We walk through the AST. // We replace all the TOK_TABREF by adding additional masking and filter if // the table needs to be masked or filtered. @@ -10736,7 +10737,7 @@ else if(ast.getChild(0).getType() == HiveParser.TOK_FALSE) { // masking and filtering should be created here // the basic idea is similar to unparseTranslator. tableMask = new TableMask(this, conf, ctx); - + // 4. continue analyzing from the child ASTNode. Phase1Ctx ctx_1 = initPhase1Ctx(); preProcessForInsert(child, qb); diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java index 5c31974..b9f8712 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java @@ -47,9 +47,11 @@ import org.slf4j.LoggerFactory; /** - * This SerDe is used to serialize the final output to thrift-able objects directly in the SerDe. Use this SerDe only for final output resultSets. - * It is used if HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS is set to true. It buffers rows that come in from FileSink till it reaches max_buffer_size (also configurable) - * or all rows are finished and FileSink.closeOp() is called. + * This SerDe is used to serialize the final output to thrift-able objects directly in the SerDe. + * Use this SerDe only for final output resultSets. It is used if + * HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS is set to true. It buffers rows that come in + * from FileSink till it reaches max_buffer_size (also configurable) or all rows are finished and + * FileSink.closeOp() is called. */ public class ThriftJDBCBinarySerDe extends AbstractSerDe { public static final Logger LOG = LoggerFactory.getLogger(ThriftJDBCBinarySerDe.class.getName()); @@ -69,8 +71,10 @@ @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { + LOG.debug("Initializing " + ThriftJDBCBinarySerDe.class); // Get column names - MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); + MAX_BUFFERED_ROWS = + HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); if (columnNameProperty.length() == 0) { @@ -102,32 +106,35 @@ public void initialize(Configuration conf, Properties tbl) throws SerDeException } private Writable serializeBatch() throws SerDeException { - output.reset(); - for (int i = 0; i < columnBuffers.length; i++) { - TColumn tColumn = columnBuffers[i].toTColumn(); - try { - tColumn.write(protocol); - } catch(TException e) { - throw new SerDeException(e); - } - } - initializeRowAndColumns(); - serializedBytesWritable.set(output.getData(), 0, output.getLength()); - return serializedBytesWritable; + output.reset(); + for (int i = 0; i < columnBuffers.length; i++) { + TColumn tColumn = columnBuffers[i].toTColumn(); + try { + tColumn.write(protocol); + } catch (TException e) { + throw new SerDeException(e); + } + } + initializeRowAndColumns(); + serializedBytesWritable.set(output.getData(), 0, output.getLength()); + return serializedBytesWritable; } - // use the columnNames to initialize the reusable row object and the columnBuffers. reason this is being done is if buffer is full, we should reinitialize the - // column buffers, otherwise at the end when closeOp() is called, things get printed multiple times. + /** + * use the columnNames to initialize the reusable row object and the columnBuffers. reason this is + * being done is if buffer is full, we should reinitialize the column buffers, otherwise at the + * end when closeOp() is called, things get printed multiple times. + */ private void initializeRowAndColumns() { - row = new ArrayList(columnNames.size()); - for (int i = 0; i < columnNames.size(); i++) { - row.add(null); - } - // Initialize column buffers - columnBuffers = new ColumnBuffer[columnNames.size()]; - for (int i = 0; i < columnBuffers.length; i++) { - columnBuffers[i] = new ColumnBuffer(Type.getType(columnTypes.get(i))); - } + row = new ArrayList(columnNames.size()); + for (int i = 0; i < columnNames.size(); i++) { + row.add(null); + } + // Initialize column buffers + columnBuffers = new ColumnBuffer[columnNames.size()]; + for (int i = 0; i < columnBuffers.length; i++) { + columnBuffers[i] = new ColumnBuffer(Type.getType(columnTypes.get(i))); + } } /** @@ -135,23 +142,24 @@ private void initializeRowAndColumns() { */ @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { - //if row is null, it means there are no more rows (closeOp()). another case can be that the buffer is full. + // if row is null, it means there are no more rows (closeOp()). another case can be that the + // buffer is full. if (obj == null) - return serializeBatch(); + return serializeBatch(); count += 1; StructObjectInspector soi = (StructObjectInspector) objInspector; List fields = soi.getAllStructFieldRefs(); try { - Object[] formattedRow = (Object[]) thriftFormatter.convert(obj, objInspector); - for (int i = 0; i < columnNames.size(); i++) { - columnBuffers[i].addValue(formattedRow[i]); - } + Object[] formattedRow = (Object[]) thriftFormatter.convert(obj, objInspector); + for (int i = 0; i < columnNames.size(); i++) { + columnBuffers[i].addValue(formattedRow[i]); + } } catch (Exception e) { - throw new SerDeException(e); + throw new SerDeException(e); } if (count == MAX_BUFFERED_ROWS) { - count = 0; - return serializeBatch(); + count = 0; + return serializeBatch(); } return null; }