diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index b09e7ae..5ab3ebb 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -107,7 +107,7 @@ public HiveEndPoint(String metaStoreUri public StreamingConnection newConnection(final boolean createPartIfNotExists) throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed , ImpersonationFailed , InterruptedException { - return newConnection(null, createPartIfNotExists, null); + return newConnection(createPartIfNotExists, null, null); } /** @@ -126,67 +126,63 @@ public StreamingConnection newConnection(final boolean createPartIfNotExists) public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf) throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed , ImpersonationFailed , InterruptedException { - return newConnection(null, createPartIfNotExists, conf); + return newConnection(createPartIfNotExists, conf, null); } /** * Acquire a new connection to MetaStore for streaming - * @param proxyUser User on whose behalf all hdfs and hive operations will be - * performed on this connection. Set it to null or empty string - * to connect as user of current process without impersonation. - * Currently this argument is not supported and must be null * @param createPartIfNotExists If true, the partition specified in the endpoint * will be auto created if it does not exist + * @param authenticatedUser UserGroupInformation object obtained from successful authentication. + * Uses insecure mode if this argument is null. * @return - * @throws ConnectionError if problem connecting + * @throws ConnectionError if there is a connection problem * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) - * @throws ImpersonationFailed if not able to impersonate 'proxyUser' + * @throws ImpersonationFailed if not able to impersonate 'username' * @throws IOException if there was an I/O error when acquiring connection * @throws PartitionCreationFailed if failed to create partition * @throws InterruptedException */ - private StreamingConnection newConnection(final String proxyUser, - final boolean createPartIfNotExists, final HiveConf conf) + public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf, + final UserGroupInformation authenticatedUser) throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException { - if (proxyUser ==null || proxyUser.trim().isEmpty() ) { - return newConnectionImpl(System.getProperty("user.name"), null, createPartIfNotExists, conf); + + if( authenticatedUser==null ) { + return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf); } - final UserGroupInformation ugi = getUserGroupInfo(proxyUser); + try { - return ugi.doAs ( - new PrivilegedExceptionAction() { + return authenticatedUser.doAs ( + new PrivilegedExceptionAction() { @Override public StreamingConnection run() throws ConnectionError, InvalidPartition, InvalidTable , PartitionCreationFailed { - return newConnectionImpl(proxyUser, ugi, createPartIfNotExists, conf); + return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf); } - } + } ); } catch (IOException e) { - throw new ImpersonationFailed("Failed to impersonate '" + proxyUser + - "' when acquiring connection", e); + throw new ConnectionError("Failed to connect as : " + authenticatedUser.getShortUserName(), e); } } - - - private StreamingConnection newConnectionImpl(String proxyUser, UserGroupInformation ugi, + private StreamingConnection newConnectionImpl(UserGroupInformation ugi, boolean createPartIfNotExists, HiveConf conf) throws ConnectionError, InvalidPartition, InvalidTable , PartitionCreationFailed { - return new ConnectionImpl(this, proxyUser, ugi, conf, createPartIfNotExists); + return new ConnectionImpl(this, ugi, conf, createPartIfNotExists); } - private static UserGroupInformation getUserGroupInfo(String proxyUser) + private static UserGroupInformation getUserGroupInfo(String user) throws ImpersonationFailed { try { return UserGroupInformation.createProxyUser( - proxyUser, UserGroupInformation.getLoginUser()); + user, UserGroupInformation.getLoginUser()); } catch (IOException e) { - LOG.error("Unable to login as proxy user. Exception follows.", e); - throw new ImpersonationFailed(proxyUser,e); + LOG.error("Unable to get UserGroupInfo for user : " + user, e); + throw new ImpersonationFailed(user,e); } } @@ -242,14 +238,12 @@ public String toString() { private static class ConnectionImpl implements StreamingConnection { private final IMetaStoreClient msClient; private final HiveEndPoint endPt; - private final String proxyUser; private final UserGroupInformation ugi; + private final String username; /** - * * @param endPoint end point to connect to - * @param proxyUser can be null - * @param ugi of prody user. If ugi is null, impersonation of proxy user will be disabled + * @param ugi on behalf of whom streaming is done. cannot be null * @param conf HiveConf object * @param createPart create the partition if it does not exist * @throws ConnectionError if there is trouble connecting @@ -257,15 +251,15 @@ public String toString() { * @throws InvalidTable if specified table does not exist * @throws PartitionCreationFailed if createPart=true and not able to create partition */ - private ConnectionImpl(HiveEndPoint endPoint, String proxyUser, UserGroupInformation ugi, + private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi, HiveConf conf, boolean createPart) throws ConnectionError, InvalidPartition, InvalidTable , PartitionCreationFailed { - this.proxyUser = proxyUser; this.endPt = endPoint; this.ugi = ugi; + this.username = ugi==null ? System.getProperty("user.name") : ugi.getShortUserName(); if (conf==null) { - conf = HiveEndPoint.createHiveConf(this.getClass(),endPoint.metaStoreUri); + conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri); } this.msClient = getMetaStoreClient(endPoint, conf); if (createPart && !endPoint.partitionVals.isEmpty()) { @@ -324,21 +318,21 @@ public TransactionBatch fetchTransactionBatch(final int numTransactions, return ugi.doAs ( new PrivilegedExceptionAction() { @Override - public TransactionBatch run() throws StreamingException { + public TransactionBatch run() throws StreamingException, InterruptedException { return fetchTransactionBatchImpl(numTransactions, recordWriter); } } ); } catch (IOException e) { - throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + - "' when acquiring Transaction Batch on endPoint " + endPt, e); + throw new ImpersonationFailed("Failed to fetch Txn Batch as user '" + ugi.getShortUserName() + + "' when acquiring Transaction Batch on endPoint " + endPt, e); } } private TransactionBatch fetchTransactionBatchImpl(int numTransactions, RecordWriter recordWriter) - throws StreamingException, TransactionBatchUnAvailable { - return new TransactionBatchImpl(proxyUser, ugi, endPt, numTransactions, msClient + throws StreamingException, TransactionBatchUnAvailable, InterruptedException { + return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient , recordWriter); } @@ -445,7 +439,7 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo } // class ConnectionImpl private static class TransactionBatchImpl implements TransactionBatch { - private final String proxyUser; + private final String username; private final UserGroupInformation ugi; private final HiveEndPoint endPt; private final IMetaStoreClient msClient; @@ -461,7 +455,7 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo /** * Represents a batch of transactions acquired from MetaStore * - * @param proxyUser + * @param user * @param ugi * @param endPt * @param numTxns @@ -470,9 +464,9 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo * @throws StreamingException if failed to create new RecordUpdater for batch * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch */ - private TransactionBatchImpl(String proxyUser, UserGroupInformation ugi, HiveEndPoint endPt - , int numTxns, IMetaStoreClient msClient, RecordWriter recordWriter) - throws StreamingException, TransactionBatchUnAvailable { + private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt + , final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter) + throws StreamingException, TransactionBatchUnAvailable, InterruptedException { try { if ( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) { Table tableObj = msClient.getTable(endPt.database, endPt.table); @@ -481,20 +475,38 @@ private TransactionBatchImpl(String proxyUser, UserGroupInformation ugi, HiveEnd } else { partNameForLock = null; } - this.proxyUser = proxyUser; + this.username = user; this.ugi = ugi; this.endPt = endPt; this.msClient = msClient; this.recordWriter = recordWriter; - this.txnIds = msClient.openTxns(proxyUser, numTxns).getTxn_ids(); + + txnIds = openTxnImpl(msClient, user, numTxns, ugi); + + this.currentTxnIndex = -1; this.state = TxnState.INACTIVE; recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1)); } catch (TException e) { throw new TransactionBatchUnAvailable(endPt, e); + } catch (IOException e) { + throw new TransactionBatchUnAvailable(endPt, e); } } + private List openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns, UserGroupInformation ugi) + throws IOException, TException, InterruptedException { + if(ugi==null) { + return msClient.openTxns(user, numTxns).getTxn_ids(); + } + return (List) ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + return msClient.openTxns(user, numTxns).getTxn_ids(); + } + }) ; + } + @Override public String toString() { if (txnIds==null || txnIds.isEmpty()) { @@ -526,8 +538,8 @@ public Void run() throws TransactionError { } ); } catch (IOException e) { - throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser + - "' when switch to next Transaction for endPoint :" + endPt, e); + throw new ImpersonationFailed("Failed switching to next Txn as user '" + username + + "' in Txn batch :" + this, e); } } @@ -536,7 +548,7 @@ private void beginNextTransactionImpl() throws TransactionError { throw new InvalidTrasactionState("No more transactions available in" + " current batch for end point : " + endPt); ++currentTxnIndex; - lockRequest = createLockRequest(endPt, partNameForLock, proxyUser, getCurrentTxnId()); + lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId()); try { LockResponse res = msClient.lock(lockRequest); if (res.getState() != LockState.ACQUIRED) { @@ -608,8 +620,8 @@ public Void run() throws StreamingException { } ); } catch (IOException e) { - throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + - "' when writing to endPoint :" + endPt + ". Transaction Id: " + throw new ImpersonationFailed("Failed wirting as user '" + username + + "' to endPoint :" + endPt + ". Transaction Id: " + getCurrentTxnId(), e); } } @@ -641,8 +653,8 @@ public Void run() throws StreamingException { } ); } catch (IOException e) { - throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser + - "' when writing to endPoint :" + endPt + ". Transaction Id: " + throw new ImpersonationFailed("Failed writing as user '" + username + + "' to endPoint :" + endPt + ". Transaction Id: " + getCurrentTxnId(), e); } } @@ -680,9 +692,8 @@ public Void run() throws StreamingException { } ); } catch (IOException e) { - throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + - "' when committing Txn on endPoint :" + endPt + ". Transaction Id: " - + getCurrentTxnId(), e); + throw new ImpersonationFailed("Failed committing Txn ID " + getCurrentTxnId() + " as user '" + + username + "'on endPoint :" + endPt + ". Transaction Id: ", e); } } @@ -726,9 +737,8 @@ public Void run() throws StreamingException { } ); } catch (IOException e) { - throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + - "' when aborting Txn on endPoint :" + endPt + ". Transaction Id: " - + getCurrentTxnId(), e); + throw new ImpersonationFailed("Failed aborting Txn " + getCurrentTxnId() + " as user '" + + username + "' on endPoint :" + endPt, e); } } @@ -784,8 +794,8 @@ public Void run() throws StreamingException { } ); } catch (IOException e) { - throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + - "' when closing Txn Batch on endPoint :" + endPt, e); + throw new ImpersonationFailed("Failed closing Txn Batch as user '" + username + + "' on endPoint :" + endPt, e); } }