diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index cb64fff..017f565 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -97,21 +97,48 @@ public HiveEndPoint(String metaStoreUri /** + * @deprecated Use {@link #newConnection(boolean, String)} + */ + public StreamingConnection newConnection(final boolean createPartIfNotExists) + throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed + , ImpersonationFailed , InterruptedException { + return newConnection(createPartIfNotExists, null, null, null); + } + /** + * @deprecated Use {@link #newConnection(boolean, HiveConf, String)} + */ + public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf) + throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed + , ImpersonationFailed , InterruptedException { + return newConnection(createPartIfNotExists, conf, null, null); + } + /** + * @deprecated Use {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)} + */ + public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf, + final UserGroupInformation authenticatedUser) + throws ConnectionError, InvalidPartition, + InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException { + return newConnection(createPartIfNotExists, conf, authenticatedUser, null); + } + /** * Acquire a new connection to MetaStore for streaming * @param createPartIfNotExists If true, the partition specified in the endpoint * will be auto created if it does not exist + * @param agentInfo should uniquely identify the process/entity that is using this batch. This + * should be something that can be correlated with calling application log files + * and/or monitoring consoles. * @return * @throws ConnectionError if problem connecting * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) * @throws ImpersonationFailed if not able to impersonate 'proxyUser' - * @throws IOException if there was an I/O error when acquiring connection * @throws PartitionCreationFailed if failed to create partition * @throws InterruptedException */ - public StreamingConnection newConnection(final boolean createPartIfNotExists) - throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed - , ImpersonationFailed , InterruptedException { - return newConnection(createPartIfNotExists, null, null); + public StreamingConnection newConnection(final boolean createPartIfNotExists, String agentInfo) + throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed + , ImpersonationFailed , InterruptedException { + return newConnection(createPartIfNotExists, null, null, agentInfo); } /** @@ -119,18 +146,20 @@ public StreamingConnection newConnection(final boolean createPartIfNotExists) * @param createPartIfNotExists If true, the partition specified in the endpoint * will be auto created if it does not exist * @param conf HiveConf object, set it to null if not using advanced hive settings. + * @param agentInfo should uniquely identify the process/entity that is using this batch. This + * should be something that can be correlated with calling application log files + * and/or monitoring consoles. * @return * @throws ConnectionError if problem connecting * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) * @throws ImpersonationFailed if not able to impersonate 'proxyUser' - * @throws IOException if there was an I/O error when acquiring connection * @throws PartitionCreationFailed if failed to create partition * @throws InterruptedException */ - public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf) + public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf, String agentInfo) throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed , ImpersonationFailed , InterruptedException { - return newConnection(createPartIfNotExists, conf, null); + return newConnection(createPartIfNotExists, conf, null, agentInfo); } /** @@ -144,21 +173,23 @@ public StreamingConnection newConnection(final boolean createPartIfNotExists, Hi * @param conf HiveConf object to be used for the connection. Can be null. * @param authenticatedUser UserGroupInformation object obtained from successful authentication. * Uses non-secure mode if this argument is null. + * @param agentInfo should uniquely identify the process/entity that is using this batch. This + * should be something that can be correlated with calling application log files + * and/or monitoring consoles. * @return * @throws ConnectionError if there is a connection problem * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) * @throws ImpersonationFailed if not able to impersonate 'username' - * @throws IOException if there was an I/O error when acquiring connection * @throws PartitionCreationFailed if failed to create partition * @throws InterruptedException */ public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf, - final UserGroupInformation authenticatedUser) + final UserGroupInformation authenticatedUser, final String agentInfo) throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException { if( authenticatedUser==null ) { - return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf); + return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, agentInfo); } try { @@ -168,7 +199,7 @@ public StreamingConnection newConnection(final boolean createPartIfNotExists, fi public StreamingConnection run() throws ConnectionError, InvalidPartition, InvalidTable , PartitionCreationFailed { - return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf); + return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, agentInfo); } } ); @@ -178,10 +209,10 @@ public StreamingConnection run() } private StreamingConnection newConnectionImpl(UserGroupInformation ugi, - boolean createPartIfNotExists, HiveConf conf) + boolean createPartIfNotExists, HiveConf conf, String agentInfo) throws ConnectionError, InvalidPartition, InvalidTable , PartitionCreationFailed { - return new ConnectionImpl(this, ugi, conf, createPartIfNotExists); + return new ConnectionImpl(this, ugi, conf, createPartIfNotExists, agentInfo); } private static UserGroupInformation getUserGroupInfo(String user) @@ -250,6 +281,7 @@ public String toString() { private final UserGroupInformation ugi; private final String username; private final boolean secureMode; + private final String agentInfo; /** * @param endPoint end point to connect to @@ -262,11 +294,12 @@ public String toString() { * @throws PartitionCreationFailed if createPart=true and not able to create partition */ private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi, - HiveConf conf, boolean createPart) + HiveConf conf, boolean createPart, String agentInfo) throws ConnectionError, InvalidPartition, InvalidTable , PartitionCreationFailed { this.endPt = endPoint; this.ugi = ugi; + this.agentInfo = agentInfo; this.username = ugi==null ? System.getProperty("user.name") : ugi.getShortUserName(); if (conf==null) { conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri); @@ -397,7 +430,7 @@ private TransactionBatch fetchTransactionBatchImpl(int numTransactions, RecordWriter recordWriter) throws StreamingException, TransactionBatchUnAvailable, InterruptedException { return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient - , recordWriter); + , recordWriter, agentInfo); } @@ -530,6 +563,7 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo * file backing this batch any more. This guards important public methods */ private volatile boolean isClosed = false; + private final String agentInfo; /** * Represents a batch of transactions acquired from MetaStore @@ -537,8 +571,9 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo * @throws StreamingException if failed to create new RecordUpdater for batch * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch */ - private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt - , final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter) + private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt, + final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter, + String agentInfo) throws StreamingException, TransactionBatchUnAvailable, InterruptedException { boolean success = false; try { @@ -554,6 +589,7 @@ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEn this.endPt = endPt; this.msClient = msClient; this.recordWriter = recordWriter; + this.agentInfo = agentInfo; txnIds = openTxnImpl(msClient, user, numTxns, ugi); @@ -628,7 +664,7 @@ private void beginNextTransactionImpl() throws TransactionError { " current batch for end point : " + endPt); ++currentTxnIndex; state = TxnState.OPEN; - lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId()); + lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId(), agentInfo); try { LockResponse res = msClient.lock(lockRequest); if (res.getState() != LockState.ACQUIRED) { @@ -957,8 +993,9 @@ public Void run() throws StreamingException { } private static LockRequest createLockRequest(final HiveEndPoint hiveEndPoint, - String partNameForLock, String user, long txnId) { - LockRequestBuilder rqstBuilder = new LockRequestBuilder(); + String partNameForLock, String user, long txnId, String agentInfo) { + LockRequestBuilder rqstBuilder = agentInfo == null ? + new LockRequestBuilder() : new LockRequestBuilder(agentInfo); rqstBuilder.setUser(user); rqstBuilder.setTransactionId(txnId); diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java index bf2cc63..3acfa35 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java @@ -276,7 +276,7 @@ private void go() { public void run() { StreamingConnection conn = null; try { - conn = endPoint.newConnection(true); + conn = endPoint.newConnection(true, "UT_" + Thread.currentThread().getName()); RecordWriter writer = new DelimitedInputWriter(cols, ",", endPoint); for (int i = 0; i < batches; i++) { diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 4d2a2ee..84e559d 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -49,6 +49,8 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; @@ -302,7 +304,7 @@ public void testBucketingWhereBucketColIsNotFirstCol() throws Exception { "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt); - StreamingConnection connection = endPt.newConnection(false, null);//should this really be null? + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -376,7 +378,7 @@ public void testStreamBucketingMatchesRegularBucketing() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "streamedtable", null); String[] colNames1 = new String[] { "key1", "key2", "data" }; DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, wr); txnBatch.beginNextTransaction(); @@ -427,14 +429,14 @@ public void testTableValidation() throws Exception { try { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation1", null); - endPt.newConnection(false); + endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); Assert.assertTrue("InvalidTable exception was not thrown", false); } catch (InvalidTable e) { // expecting this exception } try { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation2", null); - endPt.newConnection(false); + endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); Assert.assertTrue("InvalidTable exception was not thrown", false); } catch (InvalidTable e) { // expecting this exception @@ -498,17 +500,17 @@ private void checkNothingWritten(Path partitionPath) throws Exception { public void testEndpointConnection() throws Exception { // For partitioned table, partitionVals are specified HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - StreamingConnection connection = endPt.newConnection(false, null); //shouldn't throw + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); //shouldn't throw connection.close(); // For unpartitioned table, partitionVals are not specified endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); - endPt.newConnection(false, null).close(); // should not throw + endPt.newConnection(false, "UT_" + Thread.currentThread().getName()).close(); // should not throw // For partitioned table, partitionVals are not specified try { endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null); - connection = endPt.newConnection(true); + connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); Assert.assertTrue("ConnectionError was not thrown", false); connection.close(); } catch (ConnectionError e) { @@ -520,7 +522,7 @@ public void testEndpointConnection() throws Exception { // For unpartitioned table, partition values are specified try { endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, partitionVals); - connection = endPt.newConnection(false); + connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); Assert.assertTrue("ConnectionError was not thrown", false); connection.close(); } catch (ConnectionError e) { @@ -548,7 +550,7 @@ public void testAddPartition() throws Exception { } // Create partition - Assert.assertNotNull(endPt.newConnection(true, null)); + Assert.assertNotNull(endPt.newConnection(true, "UT_" + Thread.currentThread().getName())); // Ensure partition is present Partition p = msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals); @@ -561,7 +563,7 @@ public void testTransactionBatchEmptyCommit() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false, null); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -575,7 +577,7 @@ public void testTransactionBatchEmptyCommit() throws Exception { // 2) To unpartitioned table endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); writer = new DelimitedInputWriter(fieldNames2,",", endPt); - connection = endPt.newConnection(false, null); + connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -594,7 +596,7 @@ public void testTransactionBatchEmptyCommit() throws Exception { public void testTimeOutReaper() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt); - StreamingConnection connection = endPt.newConnection(false, null); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer); txnBatch.beginNextTransaction(); @@ -640,7 +642,7 @@ public void testTimeOutReaper() throws Exception { public void testHeartbeat() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt); - StreamingConnection connection = endPt.newConnection(false, null); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer); txnBatch.beginNextTransaction(); @@ -669,7 +671,7 @@ public void testTransactionBatchEmptyAbort() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -682,7 +684,7 @@ public void testTransactionBatchEmptyAbort() throws Exception { // 2) to unpartitioned table endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); writer = new DelimitedInputWriter(fieldNames,",", endPt); - connection = endPt.newConnection(true); + connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -698,7 +700,7 @@ public void testTransactionBatchCommit_Delimited() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); // 1st Txn TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -738,7 +740,7 @@ public void testTransactionBatchCommit_Delimited() throws Exception { // To Unpartitioned table endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); writer = new DelimitedInputWriter(fieldNames,",", endPt); - connection = endPt.newConnection(true); + connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); // 1st Txn txnBatch = connection.fetchTransactionBatch(10, writer); @@ -758,7 +760,7 @@ public void testTransactionBatchCommit_Json() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); StrictJsonWriter writer = new StrictJsonWriter(endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); // 1st Txn TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -786,7 +788,7 @@ public void testRemainingTransactions() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); // 1) test with txn.Commit() TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -843,7 +845,7 @@ public void testTransactionBatchAbort() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -867,16 +869,21 @@ public void testTransactionBatchAbort() throws Exception { @Test public void testTransactionBatchAbortAndCommit() throws Exception { - + String agentInfo = "UT_" + Thread.currentThread().getName(); HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, agentInfo); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); txnBatch.write("1,Hello streaming".getBytes()); txnBatch.write("2,Welcome to streaming".getBytes()); + ShowLocksResponse resp = msClient.showLocks(new ShowLocksRequest()); + Assert.assertEquals("LockCount", 1, resp.getLocksSize()); + Assert.assertEquals("LockType", LockType.SHARED_READ, resp.getLocks().get(0).getType()); + Assert.assertEquals("LockState", LockState.ACQUIRED, resp.getLocks().get(0).getState()); + Assert.assertEquals("AgentInfo", agentInfo, resp.getLocks().get(0).getAgentInfo()); txnBatch.abort(); checkNothingWritten(partLoc); @@ -901,7 +908,7 @@ public void testMultipleTransactionBatchCommits() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -950,7 +957,7 @@ public void testInterleavedTransactionBatchCommits() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); // Acquire 1st Txn Batch TransactionBatch txnBatch1 = connection.fetchTransactionBatch(10, writer); @@ -1017,7 +1024,7 @@ public void testInterleavedTransactionBatchCommits() throws Exception { WriterThd(HiveEndPoint ep, String data) throws Exception { super("Writer_" + data); writer = new DelimitedInputWriter(fieldNames, ",", ep); - conn = ep.newConnection(false); + conn = ep.newConnection(false, "UT_" + Thread.currentThread().getName()); this.data = data; setUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override @@ -1141,6 +1148,7 @@ private static SampleRec deserializeInner(Object row, StructObjectInspector insp @Test public void testBucketing() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); dropDB(msClient, dbName3); dropDB(msClient, dbName4); @@ -1166,7 +1174,7 @@ public void testBucketing() throws Exception { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, agentInfo); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1179,7 +1187,7 @@ public void testBucketing() throws Exception { HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null); DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2); - StreamingConnection connection2 = endPt2.newConnection(false); + StreamingConnection connection2 = endPt2.newConnection(false, agentInfo); TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2); txnBatch2.beginNextTransaction(); @@ -1217,6 +1225,7 @@ private void runCmdOnDriver(String cmd) throws QueryFailedException { @Test public void testFileDump() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); dropDB(msClient, dbName3); dropDB(msClient, dbName4); @@ -1242,7 +1251,7 @@ public void testFileDump() throws Exception { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, agentInfo); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1270,7 +1279,7 @@ public void testFileDump() throws Exception { HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null); DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2); - StreamingConnection connection2 = endPt2.newConnection(false); + StreamingConnection connection2 = endPt2.newConnection(false, agentInfo); TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2); txnBatch2.beginNextTransaction(); @@ -1314,7 +1323,7 @@ public void testFileDumpCorruptDataFiles() throws Exception { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); // we need side file for this test, so we create 2 txn batch and test with only one TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); @@ -1440,7 +1449,7 @@ public void testFileDumpCorruptSideFiles() throws Exception { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1655,6 +1664,7 @@ private void recordOffsets(final HiveConf conf, final String dbLocation, @Test public void testErrorHandling() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); runCmdOnDriver("create database testErrors"); runCmdOnDriver("use testErrors"); runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); @@ -1662,7 +1672,7 @@ public void testErrorHandling() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", null); DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt); FaultyWriter writer = new FaultyWriter(innerWriter); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, agentInfo); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.close(); diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index e6ccdbc..ca2a912 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -430,7 +430,7 @@ public void testStatsAfterCompactionPartTbl() throws Exception { * and starts it's own CliSessionState and then closes it, which removes it from ThreadLoacal; * thus the session * created in this class is gone after this; I fixed it in HiveEndPoint*/ - StreamingConnection connection = endPt.newConnection(true); + StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -629,7 +629,7 @@ public void minorCompactWhileStreaming() throws Exception { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); try { // Write a couple of batches for (int i = 0; i < 2; i++) { @@ -691,7 +691,7 @@ public void majorCompactWhileStreaming() throws Exception { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); try { // Write a couple of batches for (int i = 0; i < 2; i++) { @@ -731,6 +731,7 @@ public void majorCompactWhileStreaming() throws Exception { @Test public void minorCompactAfterAbort() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); String dbName = "default"; String tblName = "cws"; List colNames = Arrays.asList("a", "b"); @@ -743,7 +744,7 @@ public void minorCompactAfterAbort() throws Exception { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); try { // Write a couple of batches for (int i = 0; i < 2; i++) { @@ -808,7 +809,7 @@ public void majorCompactAfterAbort() throws Exception { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); - StreamingConnection connection = endPt.newConnection(false); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); try { // Write a couple of batches for (int i = 0; i < 2; i++) { diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 9c1b399..57517dc 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -960,16 +960,25 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc } long now = getDbTime(dbConn); s = "insert into HIVE_LOCKS " + - " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + - " values (" + extLockId + ", " + - +intLockId + "," + txnid + ", '" + - dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'") - + ", " + (partName == null ? "null" : "'" + partName + "'") + - ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + + "(hl_lock_ext_id, hl_lock_int_id, hl_txnid, " + + "hl_db, " + + "hl_table, " + + "hl_partition, " + + "hl_lock_state, hl_lock_type, " + + "hl_last_heartbeat, " + + "hl_user, " + + "hl_host, " + + "hl_agent_info) values(" + + extLockId + ", " + intLockId + "," + txnid + ", " + + quoteString(dbName) + ", " + + valueOrNullLiteral(tblName) + ", " + + valueOrNullLiteral(partName) + ", " + + quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " + //for locks associated with a txn, we always heartbeat txn and timeout based on that - (isValidTxn(txnid) ? 0 : now) + ", '" + - rqst.getUser() + "', '" + rqst.getHostname() + "')"; + (isValidTxn(txnid) ? 0 : now) + ", " + + valueOrNullLiteral(rqst.getUser()) + ", " + + valueOrNullLiteral(rqst.getHostname()) + ", " + + valueOrNullLiteral(rqst.getAgentInfo()) + ")"; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); } @@ -1175,7 +1184,7 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," + - "hl_blockedby_ext_id, hl_blockedby_int_id from HIVE_LOCKS"; + "hl_blockedby_ext_id, hl_blockedby_int_id, hl_agent_info from HIVE_LOCKS"; // Some filters may have been specified in the SHOW LOCKS statement. Add them to the query. String dbName = rqst.getDbname(); @@ -1240,6 +1249,7 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { if(!rs.wasNull()) { e.setBlockedByIntId(id); } + e.setAgentInfo(rs.getString(15)); sortedList.add(new LockInfoExt(e)); } LOG.debug("Going to rollback"); @@ -3186,6 +3196,13 @@ private String addForUpdateClause(String selectStatement) throws MetaException { throw new MetaException(msg); } } + /** + * Useful for building SQL strings + * @param value may be {@code null} + */ + private static String valueOrNullLiteral(String value) { + return value == null ? "null" : quoteString(value); + } static String quoteString(String input) { return "'" + input + "'"; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 755654c..493e3a0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -2578,6 +2578,8 @@ public static void dumpLockInfo(DataOutputStream os, ShowLocksResponse rsp) thro os.writeBytes("User"); os.write(separator); os.writeBytes("Hostname"); + os.write(separator); + os.writeBytes("Agent Info"); os.write(terminator); List locks = rsp.getLocks(); @@ -2617,6 +2619,8 @@ public static void dumpLockInfo(DataOutputStream os, ShowLocksResponse rsp) thro os.write(separator); os.writeBytes(lock.getHostname()); os.write(separator); + os.writeBytes(lock.getAgentInfo() == null ? "NULL" : lock.getAgentInfo()); + os.write(separator); os.write(terminator); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java index 45a86f6..2a855f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java @@ -53,8 +53,8 @@ * Schema for use with db txn manager. */ private static final String newFormatSchema = "lockid,database,table,partition,lock_state," + - "blocked_by,lock_type,transaction_id,last_heartbeat,acquired_at,user,hostname#" + - "string:string:string:string:string:string:string:string:string:string:string:string"; + "blocked_by,lock_type,transaction_id,last_heartbeat,acquired_at,user,hostname,agent_info#" + + "string:string:string:string:string:string:string:string:string:string:string:string:string"; public String getDatabase() { return dbName; diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 4782213..19cde2f 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -1376,4 +1376,16 @@ public void testMultiInsert() throws Exception { ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks(rqst); return rsp.getLocks(); } + + @Test + public void testShowLocksAgentInfo() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists XYZ (a int, b int)"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.compileAndRespond("select a from XYZ where b = 8")); + txnMgr.acquireLocks(driver.getPlan(), ctx, "XYZ"); + List locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "XYZ", null, locks.get(0)); + Assert.assertEquals("Wrong AgentInfo", driver.getPlan().getQueryId(), locks.get(0).getAgentInfo()); + } } diff --git ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out index c1adeb3..ef07a2a 100644 --- ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out +++ ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out @@ -2,17 +2,17 @@ PREHOOK: query: show locks PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: show locks extended PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks extended POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: show locks default PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks default POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: show transactions PREHOOK: type: SHOW TRANSACTIONS POSTHOOK: query: show transactions @@ -30,27 +30,27 @@ PREHOOK: query: show locks database default PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks database default POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: show locks partitioned_acid_table PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks partitioned_acid_table POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: show locks partitioned_acid_table extended PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks partitioned_acid_table extended POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: show locks partitioned_acid_table partition (p='abc') PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks partitioned_acid_table partition (p='abc') POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: show locks partitioned_acid_table partition (p='abc') extended PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks partitioned_acid_table partition (p='abc') extended POSTHOOK: type: SHOWLOCKS -Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info PREHOOK: query: drop table partitioned_acid_table PREHOOK: type: DROPTABLE PREHOOK: Input: default@partitioned_acid_table