diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 017f565..b398ea6 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -47,6 +47,10 @@ import org.apache.thrift.TException; import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; @@ -532,7 +536,7 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo conf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL,true); } try { - return HCatUtil.getHiveMetastoreClient(conf); + return newSynchronizedClient(HCatUtil.getHiveMetastoreClient(conf)); } catch (MetaException e) { throw new ConnectionError("Error connecting to Hive Metastore URI: " + endPoint.metaStoreUri + ". " + e.getMessage(), e); @@ -542,8 +546,42 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo } } + public static IMetaStoreClient newSynchronizedClient(IMetaStoreClient client) { + return (IMetaStoreClient) Proxy.newProxyInstance(IMetaStoreClient.class.getClassLoader(), + new Class[] { IMetaStoreClient.class }, new SynchronizedHandler(client)); + } + + private static class SynchronizedHandler implements InvocationHandler { + private final IMetaStoreClient client; + + SynchronizedHandler(IMetaStoreClient client) { + this.client = client; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + try { + synchronized (client) { + return method.invoke(client, args); + } + } catch (InvocationTargetException e) { + if (e.getTargetException() instanceof MetaException) { + throw (MetaException) e.getTargetException(); + } else if (e.getTargetException() instanceof TException) { + throw (TException) e.getTargetException(); + } else { + // should not happen + throw new TException("Error in calling method " + method.getName(), + e.getTargetException()); + } + } catch (Exception e) { + throw new TException("Error in calling method " + method.getName(), e); + } + } + } } // class ConnectionImpl + private static class TransactionBatchImpl implements TransactionBatch { private final String username; private final UserGroupInformation ugi; @@ -572,7 +610,7 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch */ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt, - final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter, + final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter, String agentInfo) throws StreamingException, TransactionBatchUnAvailable, InterruptedException { boolean success = false; @@ -1045,5 +1083,4 @@ private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean va conf.setBoolVar(var, value); } - } // class HiveEndPoint