diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index 0c6b9ea..5b0dd49 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.streaming; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; @@ -43,6 +44,7 @@ import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -68,25 +70,53 @@ private Long curBatchMinTxnId; private Long curBatchMaxTxnId; + private static final class TableWriterPair { + private final Table tbl; + private final Path partitionPath; + TableWriterPair(Table t, Path p) { + tbl = t; + partitionPath = p; + } + } protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf) + throws ConnectionError, StreamingException { + this(endPoint, conf, null); + } + AbstractRecordWriter(HiveEndPoint endPoint2, HiveConf conf, StreamingConnection conn) throws ConnectionError, StreamingException { - this.endPoint = endPoint; + this.endPoint = endPoint2; this.conf = conf!=null ? conf : HiveEndPoint.createHiveConf(DelimitedInputWriter.class, endPoint.metaStoreUri); try { msClient = HCatUtil.getHiveMetastoreClient(this.conf); - this.tbl = msClient.getTable(endPoint.database, endPoint.table); - this.partitionPath = getPathForEndPoint(msClient, endPoint); + UserGroupInformation ugi = conn != null ? conn.getUserGroupInformation() : null; + if (ugi == null) { + this.tbl = msClient.getTable(endPoint.database, endPoint.table); + this.partitionPath = getPathForEndPoint(msClient, endPoint); + } else { + TableWriterPair twp = ugi.doAs( + new PrivilegedExceptionAction() { + @Override + public TableWriterPair run() throws Exception { + return new TableWriterPair(msClient.getTable(endPoint.database, endPoint.table), + getPathForEndPoint(msClient, endPoint)); + } + }); + this.tbl = twp.tbl; + this.partitionPath = twp.partitionPath; + } this.totalBuckets = tbl.getSd().getNumBuckets(); - if(totalBuckets <= 0) { + if (totalBuckets <= 0) { throw new StreamingException("Cannot stream to table that has not been bucketed : " - + endPoint); + + endPoint); } - this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()) ; + this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()); this.bucketFieldData = new Object[bucketIds.size()]; String outFormatName = this.tbl.getSd().getOutputFormat(); - outf = (AcidOutputFormat) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf); + outf = (AcidOutputFormat) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf); bucketFieldData = new Object[bucketIds.size()]; + } catch(InterruptedException e) { + throw new StreamingException(endPoint2.toString(), e); } catch (MetaException e) { throw new ConnectionError(endPoint, e); } catch (NoSuchObjectException e) { diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java index 394cc54..0d9d1e7 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -98,7 +99,11 @@ public DelimitedInputWriter(String[] colNamesForFields, String delimiter, this(colNamesForFields, delimiter, endPoint, conf, (char) LazySerDeParameters.DefaultSeparators[0]); } - + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint, HiveConf conf, char serdeSeparator) + throws ClassNotFoundException, StreamingException { + this(colNamesForFields, delimiter, endPoint, conf, serdeSeparator, null); + } /** * Constructor. Allows overriding separator of the LazySimpleSerde * @param colNamesForFields Column name assignment for input fields @@ -108,6 +113,7 @@ public DelimitedInputWriter(String[] colNamesForFields, String delimiter, * @param serdeSeparator separator used when encoding data that is fed into the * LazySimpleSerde. Ensure this separator does not occur * in the field data + * @param conn connection this Writer is to be used with. may be null * @throws ConnectionError Problem talking to Hive * @throws ClassNotFoundException Serde class not found * @throws SerializationError Serde initialization/interaction failed @@ -115,10 +121,10 @@ public DelimitedInputWriter(String[] colNamesForFields, String delimiter, * @throws InvalidColumn any element in colNamesForFields refers to a non existing column */ public DelimitedInputWriter(String[] colNamesForFields, String delimiter, - HiveEndPoint endPoint, HiveConf conf, char serdeSeparator) + HiveEndPoint endPoint, HiveConf conf, char serdeSeparator, StreamingConnection conn) throws ClassNotFoundException, ConnectionError, SerializationError, InvalidColumn, StreamingException { - super(endPoint, conf); + super(endPoint, conf, conn); this.tableColumns = getCols(tbl); this.serdeSeparator = serdeSeparator; this.delimiter = delimiter; diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 452cb15..afad079 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -395,6 +395,9 @@ public Void run() throws Exception { } } + public UserGroupInformation getUserGroupInformation() { + return ugi; + } /** * Acquires a new batch of transactions from Hive. diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java index 25acff0..8785a21 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java @@ -18,6 +18,8 @@ package org.apache.hive.hcatalog.streaming; +import org.apache.hadoop.security.UserGroupInformation; + /** * Represents a connection to a HiveEndPoint. Used to acquire transaction batches. */ @@ -46,4 +48,8 @@ public TransactionBatch fetchTransactionBatch(int numTransactionsHint, */ public void close(); + /** + * @return UserGroupInformation associated with this connection or {@code null} if there is none + */ + UserGroupInformation getUserGroupInformation(); } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java index db73d6b..a03ae8e 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java @@ -57,17 +57,21 @@ public StrictJsonWriter(HiveEndPoint endPoint) this(endPoint, null); } + public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) throws StreamingException { + this(endPoint, conf, null); + } /** * * @param endPoint the end point to write to * @param conf a Hive conf object. Should be null if not using advanced Hive settings. + * @param conn connection this Writer is to be used with. may be null * @throws ConnectionError * @throws SerializationError * @throws StreamingException */ - public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) + public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn) throws ConnectionError, SerializationError, StreamingException { - super(endPoint, conf); + super(endPoint, conf, conn); this.serde = createSerde(tbl, conf); // get ObjInspectors for entire record and bucketed cols try {