diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java index d2e9514dda..ab79b42aa9 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java @@ -106,7 +106,7 @@ private static Path kvDataFilePath; private static Path dataTypesFilePath; - private static HiveConf conf = null; + protected static HiveConf conf = null; private static Connection hs2Conn = null; // This method should be called by sub-classes in a @BeforeClass initializer @@ -160,7 +160,7 @@ public static void afterTest() throws Exception { } } - private void createTestTable(String tableName) throws Exception { + protected void createTestTable(String tableName) throws Exception { createTestTable(hs2Conn, null, tableName, kvDataFilePath.toString()); } diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java index b69a2f94d0..1b088e2d2a 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java @@ -21,12 +21,19 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertArrayEquals; import java.math.BigDecimal; + +import com.google.common.collect.Iterables; import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.common.type.Timestamp; + +import java.util.Iterator; import java.util.List; import org.apache.hadoop.hive.llap.FieldDesc; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; import org.apache.hadoop.hive.llap.Row; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; import org.junit.BeforeClass; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -35,6 +42,9 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Connection; +import java.util.concurrent.Callable; +import java.util.stream.IntStream; + import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat; @@ -427,5 +437,84 @@ public void testKillQueryByTagOwner() throws Exception { assertNotNull("tExecute", tExecuteHolder.throwable); assertNull("tCancel", tKillHolder.throwable); } + + @Test + public void testConcurrentAddAndCloseAndCloseAllConnections() throws Exception { + createTestTable("testtab1"); + + String url = miniHS2.getJdbcURL(); + String user = System.getProperty("user.name"); + String pwd = user; + + InputFormat inputFormat = getInputFormat(); + + // Get splits + JobConf job = new JobConf(conf); + job.set(LlapBaseInputFormat.URL_KEY, url); + job.set(LlapBaseInputFormat.USER_KEY, user); + job.set(LlapBaseInputFormat.PWD_KEY, pwd); + job.set(LlapBaseInputFormat.QUERY_KEY, "select * from testtab1"); + + final String[] handleIds = IntStream.range(0, 20).boxed().map(i -> "handleId-" + i).toArray(String[]::new); + + final ExceptionHolder exceptionHolder = new ExceptionHolder(); + + // addConnThread thread will keep adding connections + // closeConnThread thread tries close connection(s) associated to handleIds, one at a time + // closeAllConnThread thread tries to close All at once. + + final int numIterations = 100; + final Iterator addConnIterator = Iterables.cycle(handleIds).iterator(); + Thread addConnThread = new Thread(() -> executeNTimes(() -> { + String handleId = addConnIterator.next(); + job.set(LlapBaseInputFormat.HANDLE_ID, handleId); + InputSplit[] splits = inputFormat.getSplits(job, 1); + assertTrue(splits.length > 0); + return null; + }, numIterations, 1, exceptionHolder)); + + final Iterator removeConnIterator = Iterables.cycle(handleIds).iterator(); + Thread closeConnThread = new Thread(() -> executeNTimes(() -> { + String handleId = removeConnIterator.next(); + LlapBaseInputFormat.close(handleId); + return null; + }, numIterations, 2, exceptionHolder)); + + Thread closeAllConnThread = new Thread(() -> executeNTimes(() -> { + LlapBaseInputFormat.closeAll(); + return null; + }, numIterations, 5, exceptionHolder)); + + addConnThread.start(); + closeConnThread.start(); + closeAllConnThread.start(); + + closeAllConnThread.join(); + closeConnThread.join(); + addConnThread.join(); + + Throwable throwable = exceptionHolder.throwable; + assertNull("Something went wrong while testAddCloseCloseAllConnections" + throwable, throwable); + + } + + private void executeNTimes(Callable action, int noOfTimes, long intervalMillis, ExceptionHolder exceptionHolder) { + for (int i = 0; i < noOfTimes; i++) { + try { + action.call(); + Thread.sleep(intervalMillis); + } catch (Exception e) { + // populate first exception only + if (exceptionHolder.throwable == null) { + synchronized (this) { + if (exceptionHolder.throwable == null) { + exceptionHolder.throwable = e; + } + } + } + } + } + } + } diff --git llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index ff91f474dd..2aa82b58f2 100644 --- llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; @@ -326,6 +327,10 @@ public static void close(String handleId) throws IOException { synchronized (lock) { handleConnections = connectionMap.remove(handleId); } + closeConnections(handleId, handleConnections); + } + + private static void closeConnections(String handleId, List handleConnections) { if (handleConnections != null) { LOG.debug("Closing {} connections for handle ID {}", handleConnections.size(), handleId); for (Connection conn : handleConnections) { @@ -345,11 +350,13 @@ public static void close(String handleId) throws IOException { */ public static void closeAll() { LOG.debug("Closing all handles"); - for (String handleId : connectionMap.keySet()) { - try { - close(handleId); - } catch (Exception err) { - LOG.error("Error closing handle ID " + handleId, err); + synchronized (lock) { + Iterator>> itr = connectionMap.entrySet().iterator(); + Map.Entry> connHandle = null; + while (itr.hasNext()) { + connHandle = itr.next(); + closeConnections(connHandle.getKey(), connHandle.getValue()); + itr.remove(); } } }