diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 1708df1..02f88c2 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -30,12 +30,12 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.collections4.ListUtils; @@ -96,7 +96,8 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapBaseInputFormat.class); private static String driverName = "org.apache.hive.jdbc.HiveDriver"; - private static final Map connectionMap = new ConcurrentHashMap(); + private static final Object lock = new Object(); + private static final Map> connectionMap = new HashMap>(); private String url; // "jdbc:hive2://localhost:10000/default" private String user; // "hive", @@ -222,10 +223,6 @@ public LlapBaseInputFormat() {} handleId = UUID.randomUUID().toString(); LOG.info("Handle ID not specified - generated handle ID {}", handleId); } - // Check if the handle has already been used in the registry - if (connectionMap.containsKey(handleId)) { - throw new IllegalStateException("Handle ID " + handleId + " has already been used. This must be unique."); - } try { Class.forName(driverName); @@ -261,20 +258,24 @@ public LlapBaseInputFormat() {} // Keep connection open to hang on to associated resources (temp tables, locks). // Save to connectionMap so it can be closed at user's convenience. - Connection putResult = connectionMap.putIfAbsent(handleId, conn); - // Hopefully no one has used the same handle ID during the time that we executed the statement. - if (putResult != null) { - String msg = "Handle ID " + handleId + " has already been used. This must be unique."; - LOG.error(msg); - conn.close(); - throw new IllegalStateException(msg); - } + addConnection(handleId, conn); } catch (Exception e) { throw new IOException(e); } return ins.toArray(new InputSplit[ins.size()]); } + private void addConnection(String handleId, Connection connection) { + synchronized (lock) { + List handleConnections = connectionMap.get(handleId); + if (handleConnections == null) { + handleConnections = new ArrayList(); + connectionMap.put(handleId, handleConnections); + } + handleConnections.add(connection); + } + } + /** * Close the connection associated with the handle ID, if getSplits() was configured with a handle ID. * Call when the application is done using the splits generated by getSplits(). @@ -282,16 +283,21 @@ public LlapBaseInputFormat() {} * @throws IOException */ public static void close(String handleId) throws IOException { - Connection conn = connectionMap.remove(handleId); - if (conn != null) { - try { - LOG.debug("Closing connection for handle ID {}", handleId); - conn.close(); - } catch (Exception err) { - throw new IOException(err); + List handleConnections; + synchronized (lock) { + handleConnections = connectionMap.remove(handleId); + } + if (handleConnections != null) { + LOG.debug("Closing {} connections for handle ID {}", handleConnections.size(), handleId); + for (Connection conn : handleConnections) { + try { + conn.close(); + } catch (Exception err) { + LOG.error("Error while closing connection for " + handleId, err); + } } } else { - LOG.info("No connection found for handle ID {}", handleId); + LOG.debug("No connection found for handle ID {}", handleId); } }