diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index cb64fff..1ba5194 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -97,9 +97,37 @@ public HiveEndPoint(String metaStoreUri /** + * @deprecated Use {@link #newConnection(boolean, String)} + */ + public StreamingConnection newConnection(final boolean createPartIfNotExists) + throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed + , ImpersonationFailed , InterruptedException { + return newConnection(createPartIfNotExists, null, null, null); + } + /** + * @deprecated Use {@link #newConnection(boolean, HiveConf, String)} + */ + public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf) + throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed + , ImpersonationFailed , InterruptedException { + return newConnection(createPartIfNotExists, conf, null, null); + } + /** + * @deprecated Use {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)} + */ + public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf, + final UserGroupInformation authenticatedUser) + throws ConnectionError, InvalidPartition, + InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException { + return newConnection(createPartIfNotExists, conf, authenticatedUser, null); + } + /** * Acquire a new connection to MetaStore for streaming * @param createPartIfNotExists If true, the partition specified in the endpoint * will be auto created if it does not exist + * @param agentInfo should uniquely identify the process/entity that is using this batch. This + * should be something that can be correlated with calling application log files + * and/or monitoring consoles. * @return * @throws ConnectionError if problem connecting * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) @@ -108,10 +136,10 @@ public HiveEndPoint(String metaStoreUri * @throws PartitionCreationFailed if failed to create partition * @throws InterruptedException */ - public StreamingConnection newConnection(final boolean createPartIfNotExists) - throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed - , ImpersonationFailed , InterruptedException { - return newConnection(createPartIfNotExists, null, null); + public StreamingConnection newConnection(final boolean createPartIfNotExists, String agentInfo) + throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed + , ImpersonationFailed , InterruptedException { + return newConnection(createPartIfNotExists, null, null, agentInfo); } /** @@ -119,6 +147,9 @@ public StreamingConnection newConnection(final boolean createPartIfNotExists) * @param createPartIfNotExists If true, the partition specified in the endpoint * will be auto created if it does not exist * @param conf HiveConf object, set it to null if not using advanced hive settings. + * @param agentInfo should uniquely identify the process/entity that is using this batch. This + * should be something that can be correlated with calling application log files + * and/or monitoring consoles. * @return * @throws ConnectionError if problem connecting * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) @@ -127,10 +158,10 @@ public StreamingConnection newConnection(final boolean createPartIfNotExists) * @throws PartitionCreationFailed if failed to create partition * @throws InterruptedException */ - public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf) + public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf, String agentInfo) throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed , ImpersonationFailed , InterruptedException { - return newConnection(createPartIfNotExists, conf, null); + return newConnection(createPartIfNotExists, conf, null, agentInfo); } /** @@ -144,6 +175,9 @@ public StreamingConnection newConnection(final boolean createPartIfNotExists, Hi * @param conf HiveConf object to be used for the connection. Can be null. * @param authenticatedUser UserGroupInformation object obtained from successful authentication. * Uses non-secure mode if this argument is null. + * @param agentInfo should uniquely identify the process/entity that is using this batch. This + * should be something that can be correlated with calling application log files + * and/or monitoring consoles. * @return * @throws ConnectionError if there is a connection problem * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) @@ -153,12 +187,12 @@ public StreamingConnection newConnection(final boolean createPartIfNotExists, Hi * @throws InterruptedException */ public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf, - final UserGroupInformation authenticatedUser) + final UserGroupInformation authenticatedUser, final String agentInfo) throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException { if( authenticatedUser==null ) { - return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf); + return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, agentInfo); } try { @@ -168,7 +202,7 @@ public StreamingConnection newConnection(final boolean createPartIfNotExists, fi public StreamingConnection run() throws ConnectionError, InvalidPartition, InvalidTable , PartitionCreationFailed { - return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf); + return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, agentInfo); } } ); @@ -178,10 +212,10 @@ public StreamingConnection run() } private StreamingConnection newConnectionImpl(UserGroupInformation ugi, - boolean createPartIfNotExists, HiveConf conf) + boolean createPartIfNotExists, HiveConf conf, String agentInfo) throws ConnectionError, InvalidPartition, InvalidTable , PartitionCreationFailed { - return new ConnectionImpl(this, ugi, conf, createPartIfNotExists); + return new ConnectionImpl(this, ugi, conf, createPartIfNotExists, agentInfo); } private static UserGroupInformation getUserGroupInfo(String user) @@ -250,6 +284,7 @@ public String toString() { private final UserGroupInformation ugi; private final String username; private final boolean secureMode; + private final String agentInfo; /** * @param endPoint end point to connect to @@ -262,11 +297,12 @@ public String toString() { * @throws PartitionCreationFailed if createPart=true and not able to create partition */ private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi, - HiveConf conf, boolean createPart) + HiveConf conf, boolean createPart, String agentInfo) throws ConnectionError, InvalidPartition, InvalidTable , PartitionCreationFailed { this.endPt = endPoint; this.ugi = ugi; + this.agentInfo = agentInfo; this.username = ugi==null ? System.getProperty("user.name") : ugi.getShortUserName(); if (conf==null) { conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri); @@ -397,7 +433,7 @@ private TransactionBatch fetchTransactionBatchImpl(int numTransactions, RecordWriter recordWriter) throws StreamingException, TransactionBatchUnAvailable, InterruptedException { return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient - , recordWriter); + , recordWriter, agentInfo); } @@ -530,6 +566,7 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo * file backing this batch any more. This guards important public methods */ private volatile boolean isClosed = false; + private final String agentInfo; /** * Represents a batch of transactions acquired from MetaStore @@ -537,8 +574,9 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo * @throws StreamingException if failed to create new RecordUpdater for batch * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch */ - private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt - , final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter) + private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt, + final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter, + String agentInfo) throws StreamingException, TransactionBatchUnAvailable, InterruptedException { boolean success = false; try { @@ -554,6 +592,7 @@ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEn this.endPt = endPt; this.msClient = msClient; this.recordWriter = recordWriter; + this.agentInfo = agentInfo; txnIds = openTxnImpl(msClient, user, numTxns, ugi); @@ -628,7 +667,7 @@ private void beginNextTransactionImpl() throws TransactionError { " current batch for end point : " + endPt); ++currentTxnIndex; state = TxnState.OPEN; - lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId()); + lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId(), agentInfo); try { LockResponse res = msClient.lock(lockRequest); if (res.getState() != LockState.ACQUIRED) { @@ -957,8 +996,9 @@ public Void run() throws StreamingException { } private static LockRequest createLockRequest(final HiveEndPoint hiveEndPoint, - String partNameForLock, String user, long txnId) { - LockRequestBuilder rqstBuilder = new LockRequestBuilder(); + String partNameForLock, String user, long txnId, String agentInfo) { + LockRequestBuilder rqstBuilder = agentInfo == null ? + new LockRequestBuilder() : new LockRequestBuilder(agentInfo); rqstBuilder.setUser(user); rqstBuilder.setTransactionId(txnId); diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java index bf2cc63..3acfa35 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java @@ -276,7 +276,7 @@ private void go() { public void run() { StreamingConnection conn = null; try { - conn = endPoint.newConnection(true); + conn = endPoint.newConnection(true, "UT_" + Thread.currentThread().getName()); RecordWriter writer = new DelimitedInputWriter(cols, ",", endPoint); for (int i = 0; i < batches; i++) { diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 4d2a2ee..6e9582b 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -49,6 +49,8 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; @@ -58,6 +60,7 @@ import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -302,7 +305,7 @@ public void testBucketingWhereBucketColIsNotFirstCol() throws Exception { "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt); - StreamingConnection connection = endPt.newConnection(false, null);//should this really be null? + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -376,7 +379,7 @@ public void testStreamBucketingMatchesRegularBucketing() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "streamedtable", null); String[] colNames1 = new String[] { "key1", "key2", "data" }; DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, wr); txnBatch.beginNextTransaction(); @@ -427,14 +430,14 @@ public void testTableValidation() throws Exception { try { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation1", null); - endPt.newConnection(false); + endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); Assert.assertTrue("InvalidTable exception was not thrown", false); } catch (InvalidTable e) { // expecting this exception } try { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation2", null); - endPt.newConnection(false); + endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); Assert.assertTrue("InvalidTable exception was not thrown", false); } catch (InvalidTable e) { // expecting this exception @@ -498,17 +501,17 @@ private void checkNothingWritten(Path partitionPath) throws Exception { public void testEndpointConnection() throws Exception { // For partitioned table, partitionVals are specified HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - StreamingConnection connection = endPt.newConnection(false, null); //shouldn't throw + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); //shouldn't throw connection.close(); // For unpartitioned table, partitionVals are not specified endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); - endPt.newConnection(false, null).close(); // should not throw + endPt.newConnection(false, "UT_" + Thread.currentThread().getName()).close(); // should not throw // For partitioned table, partitionVals are not specified try { endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null); - connection = endPt.newConnection(true); + connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); Assert.assertTrue("ConnectionError was not thrown", false); connection.close(); } catch (ConnectionError e) { @@ -520,7 +523,7 @@ public void testEndpointConnection() throws Exception { // For unpartitioned table, partition values are specified try { endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, partitionVals); - connection = endPt.newConnection(false); + connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); Assert.assertTrue("ConnectionError was not thrown", false); connection.close(); } catch (ConnectionError e) { @@ -548,7 +551,7 @@ public void testAddPartition() throws Exception { } // Create partition - Assert.assertNotNull(endPt.newConnection(true, null)); + Assert.assertNotNull(endPt.newConnection(true, "UT_" + Thread.currentThread().getName())); // Ensure partition is present Partition p = msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals); @@ -561,7 +564,7 @@ public void testTransactionBatchEmptyCommit() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false, null); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -575,7 +578,7 @@ public void testTransactionBatchEmptyCommit() throws Exception { // 2) To unpartitioned table endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); writer = new DelimitedInputWriter(fieldNames2,",", endPt); - connection = endPt.newConnection(false, null); + connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -594,7 +597,7 @@ public void testTransactionBatchEmptyCommit() throws Exception { public void testTimeOutReaper() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt); - StreamingConnection connection = endPt.newConnection(false, null); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer); txnBatch.beginNextTransaction(); @@ -640,7 +643,7 @@ public void testTimeOutReaper() throws Exception { public void testHeartbeat() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt); - StreamingConnection connection = endPt.newConnection(false, null); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer); txnBatch.beginNextTransaction(); @@ -669,7 +672,7 @@ public void testTransactionBatchEmptyAbort() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -682,7 +685,7 @@ public void testTransactionBatchEmptyAbort() throws Exception { // 2) to unpartitioned table endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); writer = new DelimitedInputWriter(fieldNames,",", endPt); - connection = endPt.newConnection(true); + connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -698,7 +701,7 @@ public void testTransactionBatchCommit_Delimited() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); // 1st Txn TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -738,7 +741,7 @@ public void testTransactionBatchCommit_Delimited() throws Exception { // To Unpartitioned table endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); writer = new DelimitedInputWriter(fieldNames,",", endPt); - connection = endPt.newConnection(true); + connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); // 1st Txn txnBatch = connection.fetchTransactionBatch(10, writer); @@ -758,7 +761,7 @@ public void testTransactionBatchCommit_Json() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); StrictJsonWriter writer = new StrictJsonWriter(endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); // 1st Txn TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -786,7 +789,7 @@ public void testRemainingTransactions() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); // 1) test with txn.Commit() TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -843,7 +846,7 @@ public void testTransactionBatchAbort() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -867,16 +870,21 @@ public void testTransactionBatchAbort() throws Exception { @Test public void testTransactionBatchAbortAndCommit() throws Exception { - + String agentInfo = "UT_" + Thread.currentThread().getName(); HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, agentInfo); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); txnBatch.write("1,Hello streaming".getBytes()); txnBatch.write("2,Welcome to streaming".getBytes()); + ShowLocksResponse resp = msClient.showLocks(new ShowLocksRequest()); + Assert.assertEquals("LockCount", 1, resp.getLocksSize()); + Assert.assertEquals("LockType", LockType.SHARED_READ, resp.getLocks().get(0).getType()); + Assert.assertEquals("LockState", LockState.ACQUIRED, resp.getLocks().get(0).getState()); + Assert.assertEquals("AgentInfo", agentInfo, resp.getLocks().get(0).getAgentInfo()); txnBatch.abort(); checkNothingWritten(partLoc); @@ -901,7 +909,7 @@ public void testMultipleTransactionBatchCommits() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -950,7 +958,7 @@ public void testInterleavedTransactionBatchCommits() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); // Acquire 1st Txn Batch TransactionBatch txnBatch1 = connection.fetchTransactionBatch(10, writer); @@ -1017,7 +1025,7 @@ public void testInterleavedTransactionBatchCommits() throws Exception { WriterThd(HiveEndPoint ep, String data) throws Exception { super("Writer_" + data); writer = new DelimitedInputWriter(fieldNames, ",", ep); - conn = ep.newConnection(false); + conn = ep.newConnection(false, "UT_" + Thread.currentThread().getName()); this.data = data; setUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override @@ -1141,6 +1149,7 @@ private static SampleRec deserializeInner(Object row, StructObjectInspector insp @Test public void testBucketing() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); dropDB(msClient, dbName3); dropDB(msClient, dbName4); @@ -1166,7 +1175,7 @@ public void testBucketing() throws Exception { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1179,7 +1188,7 @@ public void testBucketing() throws Exception { HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null); DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2); - StreamingConnection connection2 = endPt2.newConnection(false); + StreamingConnection connection2 = endPt2.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2); txnBatch2.beginNextTransaction(); @@ -1217,6 +1226,7 @@ private void runCmdOnDriver(String cmd) throws QueryFailedException { @Test public void testFileDump() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); dropDB(msClient, dbName3); dropDB(msClient, dbName4); @@ -1242,7 +1252,7 @@ public void testFileDump() throws Exception { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1270,7 +1280,7 @@ public void testFileDump() throws Exception { HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null); DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2); - StreamingConnection connection2 = endPt2.newConnection(false); + StreamingConnection connection2 = endPt2.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2); txnBatch2.beginNextTransaction(); @@ -1314,7 +1324,7 @@ public void testFileDumpCorruptDataFiles() throws Exception { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); // we need side file for this test, so we create 2 txn batch and test with only one TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); @@ -1440,7 +1450,7 @@ public void testFileDumpCorruptSideFiles() throws Exception { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1655,6 +1665,7 @@ private void recordOffsets(final HiveConf conf, final String dbLocation, @Test public void testErrorHandling() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); runCmdOnDriver("create database testErrors"); runCmdOnDriver("use testErrors"); runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); @@ -1662,7 +1673,7 @@ public void testErrorHandling() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", null); DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt); FaultyWriter writer = new FaultyWriter(innerWriter); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, agentInfo); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.close(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index ad32074..290241e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -427,7 +427,7 @@ public void testStatsAfterCompactionPartTbl() throws Exception { * and starts it's own CliSessionState and then closes it, which removes it from ThreadLoacal; * thus the session * created in this class is gone after this; I fixed it in HiveEndPoint*/ - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -626,7 +626,7 @@ public void minorCompactWhileStreaming() throws Exception { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); try { // Write a couple of batches for (int i = 0; i < 2; i++) { @@ -688,7 +688,7 @@ public void majorCompactWhileStreaming() throws Exception { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); try { // Write a couple of batches for (int i = 0; i < 2; i++) { @@ -728,6 +728,7 @@ public void majorCompactWhileStreaming() throws Exception { @Test public void minorCompactAfterAbort() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); String dbName = "default"; String tblName = "cws"; List colNames = Arrays.asList("a", "b"); @@ -740,7 +741,7 @@ public void minorCompactAfterAbort() throws Exception { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); try { // Write a couple of batches for (int i = 0; i < 2; i++) { @@ -805,7 +806,7 @@ public void majorCompactAfterAbort() throws Exception { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); try { // Write a couple of batches for (int i = 0; i < 2; i++) { diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 82d685d..96bb5f4 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -959,16 +959,25 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc } long now = getDbTime(dbConn); s = "insert into HIVE_LOCKS " + - " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + - " values (" + extLockId + ", " + - +intLockId + "," + txnid + ", '" + - dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'") - + ", " + (partName == null ? "null" : "'" + partName + "'") + - ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + + "(hl_lock_ext_id, hl_lock_int_id, hl_txnid, " + + "hl_db, " + + "hl_table, " + + "hl_partition, " + + "hl_lock_state, hl_lock_type, " + + "hl_last_heartbeat, " + + "hl_user, " + + "hl_host, " + + "hl_agent_info) values(" + + extLockId + ", " + intLockId + "," + txnid + ", " + + quoteString(dbName) + ", " + + valueOrNullLiteral(tblName) + ", " + + valueOrNullLiteral(partName) + ", " + + quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " + //for locks associated with a txn, we always heartbeat txn and timeout based on that - (isValidTxn(txnid) ? 0 : now) + ", '" + - rqst.getUser() + "', '" + rqst.getHostname() + "')"; + (isValidTxn(txnid) ? 0 : now) + ", " + + valueOrNullLiteral(rqst.getUser()) + ", " + + valueOrNullLiteral(rqst.getHostname()) + ", " + + valueOrNullLiteral(rqst.getAgentInfo()) + ")"; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); } @@ -1174,7 +1183,7 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," + - "hl_blockedby_ext_id, hl_blockedby_int_id from HIVE_LOCKS"; + "hl_blockedby_ext_id, hl_blockedby_int_id, hl_agent_info from HIVE_LOCKS"; // Some filters may have been specified in the SHOW LOCKS statement. Add them to the query. String dbName = rqst.getDbname(); @@ -1239,6 +1248,7 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { if(!rs.wasNull()) { e.setBlockedByIntId(id); } + e.setAgentInfo(rs.getString(15)); sortedList.add(new LockInfoExt(e)); } LOG.debug("Going to rollback"); @@ -3178,6 +3188,13 @@ private String addForUpdateClause(String selectStatement) throws MetaException { throw new MetaException(msg); } } + /** + * Useful for building SQL strings + * @param value may be {@code null} + */ + private static String valueOrNullLiteral(String value) { + return value == null ? "null" : quoteString(value); + } static String quoteString(String input) { return "'" + input + "'"; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 717589a..139ecb4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -2577,6 +2577,8 @@ public static void dumpLockInfo(DataOutputStream os, ShowLocksResponse rsp) thro os.writeBytes("User"); os.write(separator); os.writeBytes("Hostname"); + os.write(separator); + os.writeBytes("Agent Info"); os.write(terminator); List locks = rsp.getLocks(); @@ -2616,6 +2618,8 @@ public static void dumpLockInfo(DataOutputStream os, ShowLocksResponse rsp) thro os.write(separator); os.writeBytes(lock.getHostname()); os.write(separator); + os.writeBytes(lock.getAgentInfo() == null ? "NULL" : lock.getAgentInfo()); + os.write(separator); os.write(terminator); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 4782213..89fa20e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -1376,4 +1376,16 @@ public void testMultiInsert() throws Exception { ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks(rqst); return rsp.getLocks(); } + + @Test + public void testShowLocksAgentInfo() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists XYZ (a int, b int)"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.compileAndRespond("select a from XYZ where b = 8")); + txnMgr.acquireLocks(driver.getPlan(), ctx, "ZYZ"); + List locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "XYZ", null, locks.get(0)); + Assert.assertEquals("Wrong AgentInfo", driver.getPlan().getQueryId(), locks.get(0).getAgentInfo()); + } }