diff --git metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index 209349b..28b794e 100644
--- metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -1042,6 +1042,24 @@ public void run() {
thread.start();
loopUntilHMSReady(port);
}
+
+ public static void startMetaStore(final int port,
+ final HadoopThriftAuthBridge bridge, final HiveConf conf) throws Exception {
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ HiveMetaStore.startMetaStore(port, bridge, conf);
+ } catch (Throwable e) {
+ LOG.error("Metastore Thrift Server threw an exception...",e);
+ }
+ }
+ });
+ thread.setDaemon(true);
+ thread.start();
+ loopUntilHMSReady(port);
+ }
+
/**
* A simple connect test to make sure that the metastore is up
* @throws Exception
diff --git packaging/pom.xml packaging/pom.xml
index de9b002..3601b93 100644
--- packaging/pom.xml
+++ packaging/pom.xml
@@ -132,6 +132,11 @@
${project.version}
+ org.apache.hive
+ hive-streaming
+ ${project.version}
+
+
org.apache.hive.hcatalog
hive-hcatalog-hbase-storage-handler
${project.version}
diff --git packaging/src/main/assembly/src.xml packaging/src/main/assembly/src.xml
index bdaa47b..d9b0e6b 100644
--- packaging/src/main/assembly/src.xml
+++ packaging/src/main/assembly/src.xml
@@ -71,6 +71,7 @@
service/**/*
shims/**/*
testutils/**/*
+ streaming/**/*
/
diff --git pom.xml pom.xml
index 7343683..5463069 100644
--- pom.xml
+++ pom.xml
@@ -47,6 +47,7 @@
service
shims
testutils
+ streaming
packaging
diff --git streaming/pom.xml streaming/pom.xml
new file mode 100644
index 0000000..ee45207
--- /dev/null
+++ streaming/pom.xml
@@ -0,0 +1,117 @@
+
+
+
+ 4.0.0
+
+ org.apache.hive
+ hive
+ 0.14.0-SNAPSHOT
+ ../pom.xml
+
+
+ hive-streaming
+ jar
+ Hive Streaming
+
+
+ ..
+
+
+
+
+ hadoop-1
+
+
+ org.apache.hadoop
+ hadoop-core
+ true
+
+
+
+
+ hadoop-2
+
+
+ org.apache.hadoop
+ hadoop-common
+ true
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ true
+
+
+
+
+
+
+
+
+
+ org.apache.hive
+ hive-serde
+ ${project.version}
+
+
+ org.apache.hive
+ hive-metastore
+ ${project.version}
+
+
+ org.apache.hive
+ hive-exec
+ ${project.version}
+
+
+ org.apache.hive
+ hive-cli
+ ${project.version}
+
+
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+
+
+
+ ${basedir}/src/java
+ ${basedir}/src/test
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
+
+
diff --git streaming/src/java/org/apache/hive/streaming/AbstractLazySimpleRecordWriter.java streaming/src/java/org/apache/hive/streaming/AbstractLazySimpleRecordWriter.java
new file mode 100644
index 0000000..58fee51
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/AbstractLazySimpleRecordWriter.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * Uses LazySimple serde to enode the record
+ */
+abstract class AbstractLazySimpleRecordWriter implements RecordWriter {
+
+ private final Path partitionPath;
+ private final int totalBuckets;
+
+ private OrcOutputFormat outf = new OrcOutputFormat();
+ private final HiveConf conf;
+ private RecordUpdater updater = null;
+ private final ArrayList tableColumns;
+ private final static String serdeClassName = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+ private LazySimpleSerDe serde;
+ private ObjectInspector inspector;
+ private Random rand = new Random();
+ private int currentBucketId = 0;
+
+ private char serdeSeparator;
+
+ static final private Log LOG = LogFactory.getLog(AbstractLazySimpleRecordWriter.class.getName());
+
+ /** Base type for handling Delimited text input. Uses LazySimpleSerde
+ * to convert input byte[] to underlying Object representation
+ * @param endPoint
+ * @throws ConnectionError Problem talking to Hive
+ * @throws ClassNotFoundException Serde class not found
+ * @throws SerializationError Serde initialization/interaction failed
+ * @throws StreamingException Problem acquiring file system path for partition,
+ * or if table is not bucketed
+ */
+ protected AbstractLazySimpleRecordWriter(HiveEndPoint endPoint)
+ throws ConnectionError, ClassNotFoundException, SerializationError
+ , StreamingException {
+ this(endPoint, (char)LazySimpleSerDe.DefaultSeparators[0]);
+ }
+
+ protected AbstractLazySimpleRecordWriter(HiveEndPoint endPoint, char serdeSeparator)
+ throws ConnectionError, ClassNotFoundException, SerializationError
+ , StreamingException {
+ try {
+ conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri);
+
+ Hive hive = Hive.get(conf, false);
+
+ Table tbl = hive.getTable(endPoint.database, endPoint.table);
+ Properties tableProps = MetaStoreUtils.getTableMetadata(tbl.getTTable());
+ tableProps.setProperty("field.delim", String.valueOf(serdeSeparator));
+ this.serde = createSerde(tableProps, conf);
+ this.inspector = this.serde.getObjectInspector();
+ this.tableColumns = getPartitionCols(tbl);
+ this.partitionPath = getPathForEndPoint(hive, endPoint);
+ this.totalBuckets = tbl.getNumBuckets();
+ if(totalBuckets <= 0) {
+ throw new StreamingException("Cannot stream to table that has not been bucketed : " + endPoint);
+ }
+ this.serdeSeparator = serdeSeparator;
+
+ } catch (HiveException e) {
+ throw new ConnectionError("Problem connecting to Hive", e);
+ } catch (SerDeException e) {
+ throw new SerializationError("Failed to get object inspector from Serde "
+ + serdeClassName, e);
+ }
+ }
+
+ private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID)
+ throws IOException {
+ return outf.getRecordUpdater(partitionPath,
+ new AcidOutputFormat.Options(conf)
+ .inspector(inspector)
+ .bucket(bucketId)
+ .minimumTransactionId(minTxnId)
+ .maximumTransactionId(maxTxnID));
+ }
+
+ protected abstract byte[] reorderFields(byte[] record)
+ throws UnsupportedEncodingException;
+
+ protected ArrayList getTableColumns() {
+ return tableColumns;
+ }
+
+ @Override
+ public void write(long transactionId, byte[] record)
+ throws SerializationError, StreamingIOFailure {
+ try {
+ byte[] orderedFields = reorderFields(record);
+ Object encodedRow = encode(orderedFields);
+ updater.insert(transactionId, encodedRow);
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Error writing record in transaction("
+ + transactionId + ")", e);
+ }
+ }
+
+ @Override
+ public void flush() throws StreamingIOFailure {
+ try {
+ updater.flush();
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Unable to flush recordUpdater", e);
+ }
+ }
+
+ @Override
+ public void clear() throws StreamingIOFailure {
+ return;
+ }
+
+ /**
+ * Creates a new record updater for the new batch
+ * @param minTxnId
+ * @param maxTxnID
+ * @throws StreamingIOFailure if failed to create record updater
+ */
+ @Override
+ public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingIOFailure {
+ try {
+ this.currentBucketId = rand.nextInt(totalBuckets);
+ LOG.debug("Creating Record updater");
+ updater = createRecordUpdater(currentBucketId, minTxnId, maxTxnID);
+ } catch (IOException e) {
+ LOG.error("Failed creating record updater", e);
+ throw new StreamingIOFailure("Unable to get new record Updater", e);
+ }
+ }
+
+ @Override
+ public void closeBatch() throws StreamingIOFailure {
+ try {
+ updater.close(false);
+ updater = null;
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Unable to close recordUpdater", e);
+ }
+ }
+
+ private Object encode(byte[] record) throws SerializationError {
+ try {
+ BytesWritable blob = new BytesWritable();
+ blob.set(record, 0, record.length);
+ return serde.deserialize(blob);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to convert byte[] record into Object", e);
+ }
+ }
+
+ /**
+ * Creates LazySimpleSerde
+ * @param tableProps used to create serde
+ * @param conf used to create serde
+ * @return
+ * @throws ClassNotFoundException if serde class not found
+ * @throws SerializationError if serde could not be initialized
+ */
+ private static LazySimpleSerDe createSerde(Properties tableProps, HiveConf conf)
+ throws ClassNotFoundException, SerializationError {
+ try {
+ Class> serdeClass = Class.forName(serdeClassName);
+ LazySimpleSerDe serde =
+ (LazySimpleSerDe) ReflectionUtils.newInstance(serdeClass, conf);
+ serde.initialize(conf, tableProps);
+ return serde;
+ } catch (SerDeException e) {
+ throw new SerializationError("Error initializing serde", e);
+ }
+ }
+
+ private Path getPathForEndPoint(Hive hive, HiveEndPoint endPoint)
+ throws StreamingException {
+ try {
+ IMetaStoreClient msClient = hive.getMSC();
+ String location = null;
+ if(endPoint.partitionVals==null || endPoint.partitionVals.isEmpty() ) {
+ location = msClient.getTable(endPoint.database,endPoint.table)
+ .getSd().getLocation();
+ } else {
+ location = msClient.getPartition(endPoint.database, endPoint.table,
+ endPoint.partitionVals).getSd().getLocation();
+ }
+ return new Path(location);
+ } catch (TException e) {
+ throw new StreamingException(e.getMessage()
+ + ". Unable to get path for end point: "
+ + endPoint.partitionVals, e);
+ }
+ }
+
+ private ArrayList getPartitionCols(Table table) {
+ List cols = table.getCols();
+ ArrayList colNames = new ArrayList(cols.size());
+ for(FieldSchema col : cols) {
+ colNames.add(col.getName().toLowerCase());
+ }
+ return colNames;
+ }
+
+ public char getSerdeSeparator() {
+ return serdeSeparator;
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/ConnectionError.java streaming/src/java/org/apache/hive/streaming/ConnectionError.java
new file mode 100644
index 0000000..e0ff1da
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/ConnectionError.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class ConnectionError extends StreamingException {
+
+ public ConnectionError(String msg, Exception innerEx) {
+ super(msg, innerEx);
+ }
+ public ConnectionError(Exception e) {
+ super(e.getMessage(), e);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java
new file mode 100644
index 0000000..da2fb4e
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Streaming Writer handles delimited input (eg. CSV).
+ * Delimited input is parsed & reordered to match column order in table
+ *
+ */
+public class DelimitedInputWriter extends AbstractLazySimpleRecordWriter {
+ private final boolean reorderingNeeded;
+ private String delimiter;
+ private int[] fieldToColMapping;
+
+ static final private Log LOG = LogFactory.getLog(HiveEndPoint.class.getName());
+
+ /** Constructor. Uses default separator of the LazySimpleSerde
+ * @param colNamesForFields Column name assignment for input fields. nulls or empty
+ * strings in the array indicates the fields to be skipped
+ * @param delimiter input field delimiter
+ * @param endPoint Hive endpoint
+ * @throws ConnectionError Problem talking to Hive
+ * @throws ClassNotFoundException Serde class not found
+ * @throws SerializationError Serde initialization/interaction failed
+ * @throws StreamingException Problem acquiring file system path for partition
+ * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ super(endPoint);
+ this.delimiter = delimiter;
+ this.fieldToColMapping = getFieldReordering(colNamesForFields, getTableColumns());
+ this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns());
+ }
+
+ /**
+ *
+ * @param colNamesForFields Column name assignment for input fields
+ * @param delimiter input field delimiter
+ * @param endPoint Hive endpoint
+ * @param serdeSeparator separator used when encoding data that is fed into the
+ * LazySimpleSerde. Ensure this separator does not occur
+ * in the field data
+ * @throws ConnectionError Problem talking to Hive
+ * @throws ClassNotFoundException Serde class not found
+ * @throws SerializationError Serde initialization/interaction failed
+ * @throws StreamingException Problem acquiring file system path for partition
+ * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, char serdeSeparator)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ super(endPoint, serdeSeparator);
+ this.delimiter = delimiter;
+ this.fieldToColMapping = getFieldReordering(colNamesForFields, getTableColumns());
+ this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns());
+ LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for endpoint " + endPoint);
+ }
+
+ private boolean isReorderingNeeded(String delimiter, ArrayList tableColumns) {
+ return !( delimiter.equals(String.valueOf(getSerdeSeparator()))
+ && areFieldsInColOrder(fieldToColMapping)
+ && tableColumns.size()>=fieldToColMapping.length );
+ }
+
+ private static boolean areFieldsInColOrder(int[] fieldToColMapping) {
+ for(int i=0; i tableColNames)
+ throws InvalidColumn {
+ int[] result = new int[ colNamesForFields.length ];
+ for(int i=0; itableColNames.size()) {
+ throw new InvalidColumn("Number of field names exceeds the number of columns in table");
+ }
+ return result;
+ }
+
+ // Reorder fields in record based on the order of columns in the table
+ protected byte[] reorderFields(byte[] record) throws UnsupportedEncodingException {
+ if(!reorderingNeeded) {
+ return record;
+ }
+ String[] reorderedFields = new String[getTableColumns().size()];
+ String decoded = new String(record);
+ String[] fields = decoded.split(delimiter);
+ for (int i=0; i abortedTxns;
+ private Collection nosuchTxns;
+
+ public HeartBeatFailure(Collection abortedTxns, Set nosuchTxns) {
+ super("Heart beat error. InvalidTxns: " + nosuchTxns + ". AbortedTxns: " + abortedTxns);
+ this.abortedTxns = abortedTxns;
+ this.nosuchTxns = nosuchTxns;
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
new file mode 100644
index 0000000..9a7292f
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
@@ -0,0 +1,792 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Information about the hive partition to write to
+ */
+public class HiveEndPoint {
+ public final String metaStoreUri;
+ public final String database;
+ public final String table;
+ public final ArrayList partitionVals;
+ public final HiveConf conf;
+
+
+ static final private Log LOG = LogFactory.getLog(HiveEndPoint.class.getName());
+
+ public HiveEndPoint(String metaStoreUri
+ , String database, String table, List partitionVals) throws ConnectionError {
+ this.metaStoreUri = metaStoreUri;
+ if(database==null) {
+ throw new IllegalArgumentException("Database cannot be null for HiveEndPoint");
+ }
+ this.database = database;
+ this.table = table;
+ if(table==null) {
+ throw new IllegalArgumentException("Table cannot be null for HiveEndPoint");
+ }
+ this.partitionVals = partitionVals==null ? new ArrayList()
+ : new ArrayList( partitionVals );
+ this.conf = createHiveConf(this.getClass(),metaStoreUri);
+ }
+
+ /**
+ * Acquire a new connection to MetaStore for streaming
+ * @param createPartIfNotExists If true, the partition specified in the endpoint
+ * will be auto created if it does not exist
+ * @return
+ * @throws ConnectionError if problem connecting
+ * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false)
+ * @throws ImpersonationFailed if not able to impersonate 'proxyUser'
+ * @throws IOException if there was an I/O error when acquiring connection
+ * @throws PartitionCreationFailed if failed to create partition
+ * @throws InterruptedException
+ */
+ public StreamingConnection newConnection(final boolean createPartIfNotExists)
+ throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
+ , ImpersonationFailed , InterruptedException {
+ return newConnection(null, createPartIfNotExists);
+ }
+
+ //TODO: make this function public once proxyUser is fully supported
+ /**
+ * Acquire a new connection to MetaStore for streaming
+ * @param proxyUser User on whose behalf all hdfs and hive operations will be
+ * performed on this connection. Set it to null or empty string
+ * to connect as user of current process without impersonation.
+ * @param createPartIfNotExists If true, the partition specified in the endpoint
+ * will be auto created if it does not exist
+ * @return
+ * @throws ConnectionError if problem connecting
+ * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false)
+ * @throws ImpersonationFailed if not able to impersonate 'proxyUser'
+ * @throws IOException if there was an I/O error when acquiring connection
+ * @throws PartitionCreationFailed if failed to create partition
+ * @throws InterruptedException
+ */
+ private StreamingConnection newConnection(final String proxyUser, final boolean createPartIfNotExists)
+ throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
+ , ImpersonationFailed , InterruptedException {
+ if (proxyUser ==null || proxyUser.trim().isEmpty() ) {
+ return newConnectionImpl(System.getProperty("user.name"), null, createPartIfNotExists);
+ }
+ final UserGroupInformation ugi = getUserGroupInfo(proxyUser);
+ try {
+ return ugi.doAs (
+ new PrivilegedExceptionAction() {
+ @Override
+ public StreamingConnection run()
+ throws ConnectionError, InvalidPartition, InvalidTable
+ , PartitionCreationFailed {
+ return newConnectionImpl(proxyUser, ugi, createPartIfNotExists);
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed to impersonate '" + proxyUser +
+ "' when acquiring connection", e);
+ }
+ }
+
+ private StreamingConnection newConnectionImpl(String proxyUser, UserGroupInformation ugi,
+ boolean createPartIfNotExists)
+ throws ConnectionError, InvalidPartition, InvalidTable
+ , PartitionCreationFailed {
+ return new ConnectionImpl(this, proxyUser, ugi, conf, createPartIfNotExists);
+ }
+
+ private static UserGroupInformation getUserGroupInfo(String proxyUser)
+ throws ImpersonationFailed {
+ try {
+ return UserGroupInformation.createProxyUser(
+ proxyUser, UserGroupInformation.getLoginUser());
+ } catch (IOException e) {
+ LOG.error("Unable to login as proxy user. Exception follows.", e);
+ throw new ImpersonationFailed(proxyUser,e);
+ }
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ HiveEndPoint endPoint = (HiveEndPoint) o;
+
+ if ( database != null
+ ? !database.equals(endPoint.database)
+ : endPoint.database != null ) {
+ return false;
+ }
+ if ( metaStoreUri != null
+ ? !metaStoreUri.equals(endPoint.metaStoreUri)
+ : endPoint.metaStoreUri != null ) {
+ return false;
+ }
+ if (!partitionVals.equals(endPoint.partitionVals)) {
+ return false;
+ }
+ if (table != null ? !table.equals(endPoint.table) : endPoint.table != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = metaStoreUri != null ? metaStoreUri.hashCode() : 0;
+ result = 31 * result + (database != null ? database.hashCode() : 0);
+ result = 31 * result + (table != null ? table.hashCode() : 0);
+ result = 31 * result + partitionVals.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "{" +
+ "metaStoreUri='" + metaStoreUri + '\'' +
+ ", database='" + database + '\'' +
+ ", table='" + table + '\'' +
+ ", partitionVals=" + partitionVals + " }";
+ }
+
+
+ private static class ConnectionImpl implements StreamingConnection {
+ private final IMetaStoreClient msClient;
+ private final HiveEndPoint endPt;
+ private final String proxyUser;
+ private final UserGroupInformation ugi;
+
+ /**
+ *
+ * @param endPoint
+ * @param proxyUser can be null
+ * @param ugi of prody user. If ugi is null, impersonation of proxy user will be disabled
+ * @param conf
+ * @param createPart create the partition if it does not exist
+ * @throws ConnectionError if there is trouble connecting
+ * @throws InvalidPartition if specified partition does not exist (and createPart=false)
+ * @throws InvalidTable if specified table does not exist
+ * @throws PartitionCreationFailed if createPart=true and not able to create partition
+ */
+ private ConnectionImpl(HiveEndPoint endPoint, String proxyUser, UserGroupInformation ugi,
+ HiveConf conf, boolean createPart)
+ throws ConnectionError, InvalidPartition, InvalidTable
+ , PartitionCreationFailed {
+ this.proxyUser = proxyUser;
+ this.endPt = endPoint;
+ this.ugi = ugi;
+ this.msClient = getMetaStoreClient(endPoint, conf);
+ if(createPart && !endPoint.partitionVals.isEmpty()) {
+ createPartitionIfNotExists(endPoint, msClient, conf);
+ }
+ }
+
+ /**
+ * Close connection
+ */
+ @Override
+ public void close() {
+ if(ugi==null) {
+ msClient.close();
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction() {
+ @Override
+ public Void run() throws Exception {
+ msClient.close();
+ return null;
+ }
+ } );
+ } catch (IOException e) {
+ LOG.error("Error closing connection to " + endPt, e);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted when closing connection to " + endPt, e);
+ }
+ }
+
+
+ /**
+ * Acquires a new batch of transactions from Hive.
+ *
+ * @param numTransactions is a hint from client indicating how many transactions client needs.
+ * @param recordWriter Used to write record. The same writer instance can
+ * be shared with another TransactionBatch (to the same endpoint)
+ * only after the first TransactionBatch has been closed.
+ * Writer will be closed when the TransactionBatch is closed.
+ * @return
+ * @throws StreamingIOFailure if failed to create new RecordUpdater for batch
+ * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
+ * @throws ImpersonationFailed failed to run command as proxyUser
+ * @throws InterruptedException
+ */
+ public TransactionBatch fetchTransactionBatch(final int numTransactions,
+ final RecordWriter recordWriter)
+ throws StreamingIOFailure, TransactionBatchUnAvailable, ImpersonationFailed, InterruptedException {
+ if(ugi==null) {
+ return fetchTransactionBatchImpl(numTransactions, recordWriter);
+ }
+ try {
+ return ugi.doAs (
+ new PrivilegedExceptionAction() {
+ @Override
+ public TransactionBatch run() throws StreamingIOFailure, TransactionBatchUnAvailable {
+ return fetchTransactionBatchImpl(numTransactions, recordWriter);
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+ "' when acquiring Transaction Batch on endPoint " + endPt, e);
+ }
+ }
+
+ private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
+ RecordWriter recordWriter)
+ throws StreamingIOFailure, TransactionBatchUnAvailable {
+ return new TransactionBatchImpl(proxyUser, ugi, endPt, numTransactions, msClient, recordWriter);
+ }
+
+
+ private static void createPartitionIfNotExists(HiveEndPoint ep,
+ IMetaStoreClient msClient, HiveConf conf)
+ throws InvalidTable, PartitionCreationFailed {
+ if(ep.partitionVals.isEmpty()) {
+ return;
+ }
+ SessionState state = SessionState.start(new CliSessionState(conf));
+ Driver driver = new Driver(conf);
+
+ try {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Attempting to create partition (if not existent) " + ep);
+ }
+
+ List partKeys = msClient.getTable(ep.database, ep.table)
+ .getPartitionKeys();
+ runDDL(driver, "use " + ep.database);
+ String query = "alter table " + ep.table + " add if not exists partition "
+ + partSpecStr(partKeys, ep.partitionVals);
+ runDDL(driver, query);
+ } catch (MetaException e) {
+ LOG.error("Failed to create partition : " + ep, e);
+ throw new PartitionCreationFailed(ep, e);
+ } catch (NoSuchObjectException e) {
+ LOG.error("Failed to create partition : " + ep, e);
+ throw new InvalidTable(ep.database, ep.table);
+ } catch (TException e) {
+ LOG.error("Failed to create partition : " + ep, e);
+ throw new PartitionCreationFailed(ep, e);
+ } catch (QueryFailedException e) {
+ LOG.error("Failed to create partition : " + ep, e);
+ throw new PartitionCreationFailed(ep, e);
+ } finally {
+ driver.close();
+ try {
+ state.close();
+ } catch (IOException e) {
+ LOG.warn("Error closing SessionState used to run Hive DDL.");
+ }
+ }
+ }
+
+ private static boolean runDDL(Driver driver, String sql) throws QueryFailedException {
+ int retryCount = 1; // # of times to retry if first attempt fails
+ for(int attempt=0; attempt<=retryCount; ++attempt) {
+ try {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Running Hive Query: "+ sql);
+ }
+ driver.run(sql);
+ return true;
+ } catch (CommandNeedRetryException e) {
+ if(attempt==retryCount) {
+ throw new QueryFailedException(sql, e);
+ }
+ continue;
+ }
+ } // for
+ return false;
+ }
+
+ private static String partSpecStr(List partKeys, ArrayList partVals) {
+ if(partKeys.size()!=partVals.size()) {
+ throw new IllegalArgumentException("Partition values:" + partVals + ", does not match the partition Keys in table :" + partKeys );
+ }
+ StringBuffer buff = new StringBuffer(partKeys.size()*20);
+ buff.append(" ( ");
+ int i=0;
+ for(FieldSchema schema : partKeys) {
+ buff.append(schema.getName());
+ buff.append("='");
+ buff.append(partVals.get(i));
+ buff.append("'");
+ if(i!=partKeys.size()-1) {
+ buff.append(",");
+ }
+ ++i;
+ }
+ buff.append(" )");
+ return buff.toString();
+ }
+
+ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf)
+ throws ConnectionError {
+
+ if(endPoint.metaStoreUri!= null) {
+ conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri);
+ }
+
+ try {
+ return Hive.get(conf).getMSC();
+// return new HiveMetaStoreClient(conf);
+ } catch (MetaException e) {
+ throw new ConnectionError("Error connecting to Hive Metastore URI: "
+ + endPoint.metaStoreUri, e);
+ } catch (HiveException e) {
+ throw new ConnectionError("Error connecting to Hive Metastore URI: "
+ + endPoint.metaStoreUri, e);
+ }
+ }
+
+
+ } // class ConnectionImpl
+
+ private static class TransactionBatchImpl implements TransactionBatch {
+ private final String proxyUser;
+ private final UserGroupInformation ugi;
+ private final HiveEndPoint endPt;
+ private final IMetaStoreClient msClient;
+ private final RecordWriter recordWriter;
+ private final List txnIds;
+
+ private int currentTxnIndex;
+ private final String partNameForLock;
+
+ private TxnState state;
+ private LockRequest lockRequest = null;
+
+ /**
+ * Represents a batch of transactions acquired from MetaStore
+ *
+ * @param proxyUser
+ * @param ugi
+ * @param endPt
+ * @param numTxns
+ * @param msClient
+ * @param recordWriter
+ * @throws StreamingIOFailure if failed to create new RecordUpdater for batch
+ * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
+ */
+ private TransactionBatchImpl(String proxyUser, UserGroupInformation ugi, HiveEndPoint endPt
+ , int numTxns, IMetaStoreClient msClient, RecordWriter recordWriter)
+ throws StreamingIOFailure, TransactionBatchUnAvailable {
+ try {
+ if( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) {
+ Table tableObj = msClient.getTable(endPt.database, endPt.table);
+ List partKeys = tableObj.getPartitionKeys();
+ partNameForLock = Warehouse.makePartName(partKeys, endPt.partitionVals);
+ } else {
+ partNameForLock = null;
+ }
+ this.proxyUser = proxyUser;
+ this.ugi = ugi;
+ this.endPt = endPt;
+ this.msClient = msClient;
+ this.recordWriter = recordWriter;
+ this.txnIds = msClient.openTxns(proxyUser, numTxns).getTxn_ids();
+ this.currentTxnIndex = -1;
+ this.state = TxnState.INACTIVE;
+ recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1));
+ } catch (TException e) {
+ throw new TransactionBatchUnAvailable(endPt, e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ if(txnIds==null || txnIds.isEmpty()) {
+ return "{}";
+ }
+ return "TxnIds=[" + txnIds.get(0) + ".." + txnIds.get(txnIds.size()-1) + "] on endPoint= " + endPt;
+ }
+
+ /**
+ * Activate the next available transaction in the current transaction batch
+ * @throws TransactionError failed to switch to next transaction
+ */
+ @Override
+ public void beginNextTransaction() throws TransactionError, ImpersonationFailed,
+ InterruptedException {
+ if(ugi==null) {
+ beginNextTransactionImpl();
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction() {
+ @Override
+ public Void run() throws TransactionError {
+ beginNextTransactionImpl();
+ return null;
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser +
+ "' when switch to next Transaction for endPoint :" + endPt, e);
+ }
+ }
+
+ private void beginNextTransactionImpl() throws TransactionError {
+ if( currentTxnIndex >= txnIds.size() )
+ throw new InvalidTrasactionState("No more transactions available in" +
+ " current batch for end point : " + endPt);
+ ++currentTxnIndex;
+ lockRequest = createLockRequest(endPt, partNameForLock, proxyUser, getCurrentTxnId());
+ try {
+ LockResponse res = msClient.lock(lockRequest);
+ if(res.getState() != LockState.ACQUIRED) {
+ throw new TransactionError("Unable to acquire lock on " + endPt);
+ }
+ } catch (TException e) {
+ throw new TransactionError("Unable to acquire lock on " + endPt, e);
+ }
+
+ state = TxnState.OPEN;
+ }
+
+ /**
+ * Get Id of currently open transaction
+ * @return
+ */
+ @Override
+ public Long getCurrentTxnId() {
+ return txnIds.get(currentTxnIndex);
+ }
+
+ /**
+ * get state of current tramsaction
+ * @return
+ */
+ @Override
+ public TxnState getCurrentTransactionState() {
+ return state;
+ }
+
+ /**
+ * Remaining transactions are the ones that are not committed or aborted or active.
+ * Active transaction is not considered part of remaining txns.
+ * @return number of transactions remaining this batch.
+ */
+ @Override
+ public int remainingTransactions() {
+ if(currentTxnIndex>=0) {
+ return txnIds.size() - currentTxnIndex -1;
+ }
+ return txnIds.size();
+ }
+
+
+ /**
+ * Write record using RecordWriter
+ * @param record the data to be written
+ * @throws StreamingIOFailure I/O failure
+ * @throws SerializationError serialization error
+ * @throws ImpersonationFailed error writing on behalf of proxyUser
+ * @throws InterruptedException
+ */
+ @Override
+ public void write(final byte[] record)
+ throws StreamingIOFailure, SerializationError, InterruptedException,
+ ImpersonationFailed {
+ if(ugi==null) {
+ writeImpl(record);
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction() {
+ @Override
+ public Void run() throws StreamingIOFailure, SerializationError {
+ writeImpl(record);
+ return null;
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+ "' when writing to endPoint :" + endPt + ". Transaction Id: "
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ public void writeImpl(byte[] record)
+ throws StreamingIOFailure, SerializationError {
+ recordWriter.write(getCurrentTxnId(), record);
+ }
+
+
+ /**
+ * Write records using RecordWriter
+ * @param records collection of rows to be written
+ * @throws StreamingIOFailure I/O failure
+ * @throws SerializationError serialization error
+ * @throws ImpersonationFailed error writing on behalf of proxyUser
+ * @throws InterruptedException
+ */
+ @Override
+ public void write(final Collection records)
+ throws StreamingIOFailure, SerializationError, InterruptedException,
+ ImpersonationFailed {
+ if(ugi==null) {
+ writeImpl(records);
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction() {
+ @Override
+ public Void run() throws StreamingIOFailure, SerializationError {
+ writeImpl(records);
+ return null;
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser +
+ "' when writing to endPoint :" + endPt + ". Transaction Id: "
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ private void writeImpl(Collection records)
+ throws StreamingIOFailure, SerializationError {
+ for(byte[] record : records) {
+ writeImpl(record);
+ }
+ }
+
+
+ /**
+ * Commit the currently open transaction
+ * @throws TransactionError
+ * @throws StreamingIOFailure if flushing records failed
+ * @throws ImpersonationFailed if
+ * @throws InterruptedException
+ */
+ @Override
+ public void commit() throws TransactionError, StreamingIOFailure,
+ ImpersonationFailed, InterruptedException {
+ if(ugi==null) {
+ commitImpl();
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction() {
+ @Override
+ public Void run() throws TransactionError, StreamingIOFailure {
+ commitImpl();
+ return null;
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+ "' when committing Txn on endPoint :" + endPt + ". Transaction Id: "
+ + getCurrentTxnId(), e);
+ }
+
+ }
+
+ private void commitImpl() throws TransactionError, StreamingIOFailure {
+ try {
+ recordWriter.flush();
+ msClient.commitTxn(txnIds.get(currentTxnIndex));
+ state = TxnState.COMMITTED;
+ } catch (NoSuchTxnException e) {
+ throw new TransactionError("Invalid transaction id : "
+ + getCurrentTxnId(), e);
+ } catch (TxnAbortedException e) {
+ throw new TransactionError("Aborted transaction cannot be committed"
+ , e);
+ } catch (TException e) {
+ throw new TransactionError("Unable to commit transaction"
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ /**
+ * Abort the currently open transaction
+ * @throws TransactionError
+ */
+ @Override
+ public void abort() throws TransactionError, StreamingIOFailure
+ , ImpersonationFailed, InterruptedException {
+ if(ugi==null) {
+ abortImpl();
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction() {
+ @Override
+ public Void run() throws TransactionError, StreamingIOFailure {
+ abortImpl();
+ return null;
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+ "' when aborting Txn on endPoint :" + endPt + ". Transaction Id: "
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ private void abortImpl() throws TransactionError, StreamingIOFailure {
+ try {
+ recordWriter.clear();
+ msClient.rollbackTxn(getCurrentTxnId());
+ state = TxnState.ABORTED;
+ } catch (NoSuchTxnException e) {
+ throw new TransactionError("Unable to abort invalid transaction id : "
+ + getCurrentTxnId(), e);
+ } catch (TException e) {
+ throw new TransactionError("Unable to abort transaction id : "
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ @Override
+ public void heartbeat() throws StreamingException, HeartBeatFailure {
+ Long first = txnIds.get(currentTxnIndex);
+ Long last = txnIds.get(txnIds.size()-1);
+ try {
+ HeartbeatTxnRangeResponse resp = msClient.heartbeatTxnRange(first, last);
+ if(!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
+ throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch());
+ }
+ } catch (TException e) {
+ throw new StreamingException("Failure to heartbeat on ids (" + first + ".."
+ + last + ") on end point : " + endPt );
+ }
+ }
+
+ /**
+ * Close the TransactionBatch
+ * @throws StreamingIOFailure I/O failure when closing transaction batch
+ */
+ @Override
+ public void close() throws StreamingIOFailure, ImpersonationFailed, InterruptedException {
+ if(ugi==null) {
+ state = TxnState.INACTIVE;
+ recordWriter.closeBatch();
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction() {
+ @Override
+ public Void run() throws StreamingIOFailure {
+ state = TxnState.INACTIVE;
+ recordWriter.closeBatch();
+ return null;
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+ "' when closing Txn Batch on endPoint :" + endPt, e);
+ }
+ }
+
+ private static LockRequest createLockRequest(final HiveEndPoint hiveEndPoint,
+ String partNameForLock, String user, long txnId) {
+ LockRequestBuilder rqstBuilder = new LockRequestBuilder();
+ rqstBuilder.setUser(user);
+ rqstBuilder.setTransactionId(txnId);
+
+ LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+ .setDbName(hiveEndPoint.database)
+ .setTableName(hiveEndPoint.table)
+ .setShared();
+ if(partNameForLock!=null && !partNameForLock.isEmpty() ) {
+ lockCompBuilder.setPartitionName(partNameForLock);
+ }
+ rqstBuilder.addLockComponent(lockCompBuilder.build());
+
+ return rqstBuilder.build();
+ }
+ } // class TransactionBatchImpl
+
+ static HiveConf createHiveConf(Class> clazz, String metaStoreUri) {
+ HiveConf conf = new HiveConf(clazz);
+ conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER,
+ "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+ conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+ if(metaStoreUri!= null) {
+ conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
+ }
+ return conf;
+ }
+
+} // class HiveEndPoint
diff --git streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
new file mode 100644
index 0000000..6526124
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class ImpersonationFailed extends StreamingException {
+ public ImpersonationFailed(String username, Exception e) {
+ super("Failed to impersonate user " + username, e);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/InvalidColumn.java streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
new file mode 100644
index 0000000..08d8fdf
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidColumn extends StreamingException {
+
+ public InvalidColumn(String msg) {
+ super(msg);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/InvalidPartition.java streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
new file mode 100644
index 0000000..94f6b6e
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidPartition extends StreamingException {
+
+ public InvalidPartition(String partitionName, String partitionValue) {
+ super("Invalid partition: Name=" + partitionName +
+ ", Value=" + partitionValue);
+ }
+
+}
diff --git streaming/src/java/org/apache/hive/streaming/InvalidTable.java streaming/src/java/org/apache/hive/streaming/InvalidTable.java
new file mode 100644
index 0000000..d4552ec
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/InvalidTable.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidTable extends StreamingException {
+
+ private static String makeMsg(String db, String table) {
+ return "Invalid table db:" + db + ", table:" + table;
+ }
+
+ public InvalidTable(String db, String table, Exception cause) {
+ super(makeMsg(db,table), cause);
+ }
+
+ public InvalidTable(String db, String table) {
+ super(makeMsg(db,table), null);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
new file mode 100644
index 0000000..ed5ccfc
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidTrasactionState extends TransactionError {
+ public InvalidTrasactionState(String msg) {
+ super(msg);
+ }
+
+ public InvalidTrasactionState(String msg, Exception e) {
+ super(msg,e);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
new file mode 100644
index 0000000..82fde58
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class PartitionCreationFailed extends StreamingException {
+ public PartitionCreationFailed(HiveEndPoint endPoint, Exception cause) {
+ super("Failed to create partition " + endPoint, cause);
+ }
+
+ public PartitionCreationFailed(String msg, Exception cause) {
+ super(msg, cause);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/QueryFailedException.java streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
new file mode 100644
index 0000000..7cbde61
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+
+public class QueryFailedException extends StreamingException {
+ String query;
+ public QueryFailedException(String query, CommandNeedRetryException e) {
+ super("Query failed: " + query + ". Due to :" + e.getMessage(), e);
+ this.query = query;
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/RecordWriter.java streaming/src/java/org/apache/hive/streaming/RecordWriter.java
new file mode 100644
index 0000000..528203e
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/RecordWriter.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public interface RecordWriter {
+
+ /** Write using RecordUpdater
+ *
+ * @param transactionId
+ * @param record
+ * @throws StreamingIOFailure on I/O errors
+ * @throws SerializationError if failed serializing record
+ */
+ public void write(long transactionId, byte[] record) throws StreamingIOFailure, SerializationError;
+
+ /** Flush records from buffer. Invoked by TransactionBatch.commit() */
+ public void flush() throws StreamingIOFailure;
+
+ /** Clear bufferred writes. Invoked by TransactionBatch.abort() */
+ public void clear() throws StreamingIOFailure;
+
+ /** Acquire a new RecordUpdater. Invoked when
+ * StreamingConnection.fetchTransactionBatch() is called */
+ public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingIOFailure;
+
+ /** Close the RecordUpdater. Invoked by TransactionBatch.close() */
+ public void closeBatch() throws StreamingIOFailure;
+}
diff --git streaming/src/java/org/apache/hive/streaming/SerializationError.java streaming/src/java/org/apache/hive/streaming/SerializationError.java
new file mode 100644
index 0000000..6844aca
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/SerializationError.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public class SerializationError extends StreamingException {
+ public SerializationError(String msg, Exception e) {
+ super(msg,e);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/StreamingConnection.java streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
new file mode 100644
index 0000000..8480ea9
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+/**
+ * Represents a connection to a HiveEndPoint. Used to acquire transaction batches.
+ */
+public interface StreamingConnection {
+
+ /**
+ * Acquires a new batch of transactions from Hive.
+
+ * @param numTransactionsHint is a hint from client indicating how many transactions client needs.
+ * @param writer Used to write record. The same writer instance can
+ * be shared with another TransactionBatch (to the same endpoint)
+ * only after the first TransactionBatch has been closed.
+ * Writer will be closed when the TransactionBatch is closed.
+ * @return
+ * @throws ConnectionError
+ * @throws InvalidPartition
+ * @throws StreamingException
+ * @return a batch of transactions
+ */
+ public TransactionBatch fetchTransactionBatch(int numTransactionsHint,
+ RecordWriter writer)
+ throws ConnectionError, StreamingException, InterruptedException;
+
+ /**
+ * Close connection
+ */
+ public void close();
+
+}
diff --git streaming/src/java/org/apache/hive/streaming/StreamingException.java streaming/src/java/org/apache/hive/streaming/StreamingException.java
new file mode 100644
index 0000000..9486d6d
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/StreamingException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class StreamingException extends Exception {
+ public StreamingException(String msg, Exception cause) {
+ super(msg, cause);
+ }
+ public StreamingException(String msg) {
+ super(msg);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
new file mode 100644
index 0000000..fb09488
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public class StreamingIOFailure extends StreamingException {
+
+ public StreamingIOFailure(String msg, Exception cause) {
+ super(msg, cause);
+ }
+
+ public StreamingIOFailure(String msg) {
+ super(msg);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/TransactionBatch.java streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
new file mode 100644
index 0000000..e9e42fa
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Represents a set of Transactions returned by Hive. Supports opening, writing to
+ * and commiting/aborting each transaction. The interface is designed to ensure
+ * transactions in a batch are used up sequentially. Multiple transaction batches can be
+ * used (initialized with separate RecordWriters) for concurrent streaming
+ *
+ */
+public interface TransactionBatch {
+ public enum TxnState {INACTIVE, OPEN, COMMITTED, ABORTED }
+
+ /**
+ * Activate the next available transaction in the current transaction batch
+ * @throws StreamingException
+ */
+ public void beginNextTransaction() throws StreamingException, InterruptedException;
+
+ /**
+ * Get Id of currently open transaction
+ * @return transaction id
+ */
+ public Long getCurrentTxnId();
+
+ /**
+ * get state of current transaction
+ */
+ public TxnState getCurrentTransactionState();
+
+ /**
+ * Commit the currently open transaction
+ * @throws StreamingException
+ */
+ public void commit() throws StreamingException, InterruptedException;
+
+ /**
+ * Abort the currently open transaction
+ * @throws StreamingException
+ */
+ public void abort() throws StreamingException, InterruptedException;
+
+ /**
+ * Remaining transactions are the ones that are not committed or aborted or open.
+ * Currently open transaction is not considered part of remaining txns.
+ * @return number of transactions remaining this batch.
+ */
+ public int remainingTransactions();
+
+
+ /**
+ * Write record using RecordWriter
+ * @param record the data to be written
+ * @throws ConnectionError
+ * @throws IOException
+ * @throws StreamingException
+ */
+ public void write(byte[] record) throws StreamingException, InterruptedException;
+
+ /**
+ * Write records using RecordWriter
+ * @param records collection of rows to be written
+ * @throws ConnectionError
+ * @throws IOException
+ * @throws StreamingException
+ */
+ public void write(Collection records) throws StreamingException, InterruptedException;
+
+
+ /**
+ * Issues a heartbeat to hive metastore on the current and remaining txn ids
+ * to keep them from expiring
+ * @throws StreamingException
+ * @throws HeartBeatFailure
+ */
+ public void heartbeat() throws StreamingException, HeartBeatFailure;
+
+ /**
+ * Close the TransactionBatch
+ * @throws StreamingException
+ */
+ public void close() throws StreamingException, InterruptedException;
+}
diff --git streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
new file mode 100644
index 0000000..01883da
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class TransactionBatchUnAvailable extends StreamingException {
+ public TransactionBatchUnAvailable(HiveEndPoint ep, Exception e) {
+ super("Unable to acquire transaction batch on end point: " + ep, e);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/TransactionError.java streaming/src/java/org/apache/hive/streaming/TransactionError.java
new file mode 100644
index 0000000..1acb4cc
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/TransactionError.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class TransactionError extends StreamingException {
+ public TransactionError(String msg, Exception e) {
+ super(msg, e);
+ }
+
+ public TransactionError(String msg) {
+ super(msg);
+ }
+}
diff --git streaming/src/test/org/apache/hive/streaming/StreamingIntegrationTester.java streaming/src/test/org/apache/hive/streaming/StreamingIntegrationTester.java
new file mode 100644
index 0000000..5864783
--- /dev/null
+++ streaming/src/test/org/apache/hive/streaming/StreamingIntegrationTester.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.Parser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.LogUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.Random;
+
+/**
+ * A stand alone utility to write data into the streaming ingest interface.
+ */
+public class StreamingIntegrationTester {
+
+ static final private Log LOG = LogFactory.getLog(StreamingIntegrationTester.class.getName());
+
+ public static void main(String[] args) {
+
+ try {
+ LogUtils.initHiveLog4j();
+ } catch (LogUtils.LogInitializationException e) {
+ System.err.println("Unable to initialize log4j " + StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
+
+ Options options = new Options();
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("abort-pct")
+ .withDescription("Percentage of transactions to abort, defaults to 5")
+ .withLongOpt("abortpct")
+ .create('a'));
+
+ options.addOption(OptionBuilder
+ .hasArgs()
+ .withArgName("column-names")
+ .withDescription("column names of table to write to")
+ .withLongOpt("columns")
+ .withValueSeparator(',')
+ .isRequired()
+ .create('c'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("database")
+ .withDescription("Database of table to write to")
+ .withLongOpt("database")
+ .isRequired()
+ .create('d'));
+
+// options.addOption(OptionBuilder
+// .hasArg()
+// .withArgName("user")
+// .withDescription("User to impersonate")
+// .withLongOpt("user")
+// .create('u'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("frequency")
+ .withDescription("How often to commit a transaction, in seconds, defaults to 1")
+ .withLongOpt("frequency")
+ .create('f'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("iterations")
+ .withDescription("Number of batches to write, defaults to 10")
+ .withLongOpt("num-batches")
+ .create('i'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("metastore-uri")
+ .withDescription("URI of Hive metastore")
+ .withLongOpt("metastore-uri")
+ .isRequired()
+ .create('m'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("num_transactions")
+ .withDescription("Number of transactions per batch, defaults to 100")
+ .withLongOpt("num-txns")
+ .create('n'));
+
+ options.addOption(OptionBuilder
+ .hasArgs()
+ .withArgName("partition-values")
+ .withDescription("partition values, must be provided in order of partition columns, " +
+ "if not provided table is assumed to not be partitioned")
+ .withLongOpt("partition")
+ .withValueSeparator(',')
+ .create('p'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("records-per-transaction")
+ .withDescription("records to write in each transaction, defaults to 100")
+ .withLongOpt("records-per-txn")
+ .withValueSeparator(',')
+ .create('r'));
+
+ options.addOption(OptionBuilder
+ .hasArgs()
+ .withArgName("column-types")
+ .withDescription("column types, valid values are string, int, float, decimal, date, " +
+ "datetime")
+ .withLongOpt("schema")
+ .withValueSeparator(',')
+ .isRequired()
+ .create('s'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("table")
+ .withDescription("Table to write to")
+ .withLongOpt("table")
+ .isRequired()
+ .create('t'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("num-writers")
+ .withDescription("Number of writers to create, defaults to 2")
+ .withLongOpt("writers")
+ .create('w'));
+
+ options.addOption(OptionBuilder
+ .hasArg(false)
+ .withArgName("pause")
+ .withDescription("pause for keyboard input on every commit and batch close. disabled by default")
+ .withLongOpt("pause")
+ .create('x'));
+
+
+ Parser parser = new GnuParser();
+ CommandLine cmdline = null;
+ try {
+ cmdline = parser.parse(options, args);
+ } catch (ParseException e) {
+ usage(options);
+ }
+
+ boolean pause = cmdline.hasOption('x');
+// String user = cmdline.getOptionValue('u');
+ String db = cmdline.getOptionValue('d');
+ String table = cmdline.getOptionValue('t');
+ String uri = cmdline.getOptionValue('m');
+ int txnsPerBatch = Integer.valueOf(cmdline.getOptionValue('n', "100"));
+ int writers = Integer.valueOf(cmdline.getOptionValue('w', "2"));
+ int batches = Integer.valueOf(cmdline.getOptionValue('i', "10"));
+ int recordsPerTxn = Integer.valueOf(cmdline.getOptionValue('r', "100"));
+ int frequency = Integer.valueOf(cmdline.getOptionValue('f', "1"));
+ int ap = Integer.valueOf(cmdline.getOptionValue('a', "5"));
+ float abortPct = ((float)ap) / 100.0f;
+ String[] partVals = cmdline.getOptionValues('p');
+ String[] cols = cmdline.getOptionValues('c');
+ String[] types = cmdline.getOptionValues('s');
+
+ StreamingIntegrationTester sit = new StreamingIntegrationTester(db, table, uri,
+ txnsPerBatch, writers, batches, recordsPerTxn, frequency, abortPct, partVals, cols, types, pause);
+ sit.go();
+ }
+
+ static void usage(Options options) {
+ HelpFormatter hf = new HelpFormatter();
+ hf.printHelp(HelpFormatter.DEFAULT_WIDTH, "sit [options]", "Usage: ", options, "");
+ System.exit(-1);
+ }
+
+ private String user;
+ private String db;
+ private String table;
+ private String uri;
+ private int txnsPerBatch;
+ private int writers;
+ private int batches;
+ private int recordsPerTxn;
+ private int frequency;
+ private float abortPct;
+ private String[] partVals;
+ private String[] cols;
+ private String[] types;
+ private boolean pause;
+
+
+ private StreamingIntegrationTester(String db, String table, String uri, int txnsPerBatch,
+ int writers, int batches, int recordsPerTxn,
+ int frequency, float abortPct, String[] partVals,
+ String[] cols, String[] types, boolean pause) {
+ this.db = db;
+ this.table = table;
+ this.uri = uri;
+ this.txnsPerBatch = txnsPerBatch;
+ this.writers = writers;
+ this.batches = batches;
+ this.recordsPerTxn = recordsPerTxn;
+ this.frequency = frequency;
+ this.abortPct = abortPct;
+ this.partVals = partVals;
+ this.cols = cols;
+ this.types = types;
+ this.pause = pause;
+ }
+
+ private void go() {
+ HiveEndPoint endPoint = null;
+ try {
+ if(partVals == null) {
+ endPoint = new HiveEndPoint(uri, db, table, null);
+ } else {
+ endPoint = new HiveEndPoint(uri, db, table, Arrays.asList(partVals));
+ }
+
+ for (int i = 0; i < writers; i++) {
+ Writer w = new Writer(endPoint, user, i, txnsPerBatch, batches, recordsPerTxn, frequency, abortPct,
+ cols, types, pause);
+ w.start();
+ }
+
+ } catch (Throwable t) {
+ System.err.println("Caught exception while testing: " + StringUtils.stringifyException(t));
+ }
+ }
+
+ private static class Writer extends Thread {
+ private HiveEndPoint endPoint;
+ private final String user;
+ private int txnsPerBatch;
+ private int batches;
+ private int writerNumber;
+ private int recordsPerTxn;
+ private int frequency;
+ private float abortPct;
+ private String[] cols;
+ private String[] types;
+ private boolean pause;
+ private Random rand;
+
+ Writer(HiveEndPoint endPoint, String user, int writerNumber, int txnsPerBatch, int batches,
+ int recordsPerTxn, int frequency, float abortPct, String[] cols, String[] types, boolean pause) {
+ this.endPoint = endPoint;
+ this.user = user;
+ this.txnsPerBatch = txnsPerBatch;
+ this.batches = batches;
+ this.writerNumber = writerNumber;
+ this.recordsPerTxn = recordsPerTxn;
+ this.frequency = frequency;
+ this.abortPct = abortPct;
+ this.cols = cols;
+ this.types = types;
+ this.pause = pause;
+ rand = new Random();
+ }
+
+ @Override
+ public void run() {
+ StreamingConnection conn = null;
+ try {
+ conn = endPoint.newConnection(true);
+ RecordWriter writer = new DelimitedInputWriter(cols, ",", endPoint);
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < batches; i++) {
+ LOG.info("Starting batch " + i);
+ TransactionBatch batch = conn.fetchTransactionBatch(txnsPerBatch, writer);
+ try {
+ while (batch.remainingTransactions() > 0) {
+ batch.beginNextTransaction();
+ for (int j = 0; j < recordsPerTxn; j++) {
+ batch.write(generateRecord(cols, types));
+ }
+ if (rand.nextFloat() < abortPct) batch.abort();
+ else
+ batch.commit();
+ if(pause) {
+ System.out.println("Writer " + writerNumber + " committed... press Enter to continue. " + Thread.currentThread().getId());
+ System.in.read();
+ }
+ }
+ long end = System.currentTimeMillis();
+ if (end - start < frequency) Thread.sleep(frequency - (end - start));
+ } finally {
+ batch.close();
+ if(pause) {
+ System.out.println("Writer " + writerNumber + " has closed a Batch.. press Enter to continue. " + Thread.currentThread().getId());
+ System.in.read();
+ }
+ }
+ }
+ } catch (Throwable t) {
+ System.err.println("Writer number " + writerNumber + " caught exception while testing: " +
+ StringUtils.stringifyException(t));
+ } finally {
+ if(conn!=null) conn.close();
+ }
+ }
+
+ private byte[] generateRecord(String[] cols, String[] types) {
+ // TODO make it so I can randomize the column order
+
+ StringBuilder buf = new StringBuilder();
+ for (int i = 0; i < types.length; i++) {
+ buf.append(generateColumn(types[i]));
+ buf.append(",");
+ }
+ return buf.toString().getBytes();
+ }
+
+ private String generateColumn(String type) {
+ if ("string".equals(type.toLowerCase())) {
+ return "When that Aprilis with his showers swoot";
+ } else if (type.toLowerCase().startsWith("int")) {
+ return "42";
+ } else if (type.toLowerCase().startsWith("dec") || type.toLowerCase().equals("float")) {
+ return "3.141592654";
+ } else if (type.toLowerCase().equals("datetime")) {
+ return "2014-03-07 15:33:22";
+ } else if (type.toLowerCase().equals("date")) {
+ return "1955-11-12";
+ } else {
+ throw new RuntimeException("Sorry, I don't know the type " + type);
+ }
+ }
+ }
+}
diff --git streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
new file mode 100644
index 0000000..b90f95d
--- /dev/null
+++ streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import com.google.common.collect.Lists;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class TestDelimitedInputWriter {
+ @Test
+ public void testFieldReordering() throws Exception {
+
+ ArrayList colNames = Lists.newArrayList(new String[]{"col1", "col2", "col3", "col4", "col5"});
+ {//1) test dropping fields - first middle & last
+ String[] fieldNames = {null, "col2", null, "col4", null};
+ int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.assertTrue(Arrays.equals(mapping, new int[]{-1, 1, -1, 3, -1}));
+ }
+
+ {//2) test reordering
+ String[] fieldNames = {"col5", "col4", "col3", "col2", "col1"};
+ int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.assertTrue( Arrays.equals(mapping, new int[]{4,3,2,1,0}) );
+ }
+
+ {//3) test bad field names
+ String[] fieldNames = {"xyz", "abc", "col3", "col4", "as"};
+ try {
+ DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.fail();
+ } catch (InvalidColumn e) {
+ // should throw
+ }
+ }
+
+ {//4) test few field names
+ String[] fieldNames = {"col3", "col4"};
+ int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.assertTrue( Arrays.equals(mapping, new int[]{2,3}) );
+ }
+
+ {//5) test extra field names
+ String[] fieldNames = {"col5", "col4", "col3", "col2", "col1", "col1"};
+ try {
+ DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.fail();
+ } catch (InvalidColumn e) {
+ //show throw
+ }
+ }
+ }
+}
diff --git streaming/src/test/org/apache/hive/streaming/TestStreaming.java streaming/src/test/org/apache/hive/streaming/TestStreaming.java
new file mode 100644
index 0000000..206adb6
--- /dev/null
+++ streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -0,0 +1,733 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class TestStreaming {
+
+ private static final String COL1 = "id";
+ private static final String COL2 = "msg";
+
+ private final HiveConf conf;
+ private final IMetaStoreClient msClient;
+ private final Hive hive;
+
+ //public boolean local = false;
+ //private final int port ;
+// final String metaStoreURI = "thrift://172.16.0.21:9083";
+ final String metaStoreURI = null;
+
+ // partitioned table
+ private final static String dbName = "testing";
+ private final static String tblName = "alerts";
+ private final static String[] fieldNames = new String[]{COL1,COL2};
+ List partitionVals;
+ private static String partLocation;
+
+ // unpartitioned table
+ private final static String dbName2 = "testing";
+ private final static String tblName2 = "alerts";
+ private final static String[] fieldNames2 = new String[]{COL1,COL2};
+
+ private final String PART1_CONTINENT = "Asia";
+ private final String PART1_COUNTRY = "India";
+
+
+ //private Driver driver;
+ public TestStreaming() throws Exception {
+ partitionVals = new ArrayList(2);
+ partitionVals.add(PART1_CONTINENT);
+ partitionVals.add(PART1_COUNTRY);
+
+ //port = MetaStoreUtils.findFreePort();
+
+ conf = new HiveConf(this.getClass());
+ TxnDbUtil.setConfValues(conf);
+ if(metaStoreURI!=null) {
+ conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
+ }
+ conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+
+ /*
+ if(local) {
+ //1) Start from a clean slate (metastore)
+ TxnDbUtil.cleanDb();
+ TxnDbUtil.prepDb();
+ */
+ //1) Start from a clean slate (metastore)
+ TxnDbUtil.cleanDb();
+ TxnDbUtil.prepDb();
+
+ //2) Start Hive Metastore on separate thread
+ //MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge(), conf);
+
+ //3) obtain metastore clients
+ hive = Hive.get(conf);
+ msClient = hive.getMSC();
+ //SessionState.start(new CliSessionState(conf));
+ //driver = new Driver(conf);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ // drop and recreate the necessary databases and tables
+ dropDB(msClient, dbName);
+ createDbAndTable(msClient, dbName, tblName, partitionVals);
+
+ dropDB(msClient, dbName2);
+ createDbAndTable(msClient, dbName2, tblName2, partitionVals);
+ }
+
+ private void printResults(ArrayList res) {
+ for(String s: res) {
+ System.out.println(s);
+ }
+ System.out.println("Total records: " + res.size());
+ }
+
+ private static List getPartitionKeys() {
+ List fields = new ArrayList();
+ // Defining partition names in unsorted order
+ fields.add(new FieldSchema("continent", serdeConstants.STRING_TYPE_NAME, ""));
+ fields.add(new FieldSchema("country", serdeConstants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+ private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpectedFiles,
+ String... records) throws Exception {
+ ValidTxnList txns = msClient.getValidTxns();
+ AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns);
+ Assert.assertEquals(0, dir.getObsolete().size());
+ Assert.assertEquals(0, dir.getOriginalFiles().size());
+ List current = dir.getCurrentDirectories();
+ System.out.println("Files found: ");
+ for (AcidUtils.ParsedDelta pd : current) System.out.println(pd.getPath().toString());
+ Assert.assertEquals(numExpectedFiles, current.size());
+
+ // find the absolute mininum transaction
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ for (AcidUtils.ParsedDelta pd : current) {
+ if (pd.getMaxTransaction() > max) max = pd.getMaxTransaction();
+ if (pd.getMinTransaction() < min) min = pd.getMinTransaction();
+ }
+ Assert.assertEquals(minTxn, min);
+ Assert.assertEquals(maxTxn, max);
+
+ InputFormat inf = new OrcInputFormat();
+ JobConf job = new JobConf();
+ job.set("mapred.input.dir", partLocation.toString());
+ job.set("bucket_count", Integer.toString(buckets));
+ job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+ InputSplit[] splits = inf.getSplits(job, 1);
+ Assert.assertEquals(1, splits.length);
+ org.apache.hadoop.mapred.RecordReader rr =
+ inf.getRecordReader(splits[0], job, Reporter.NULL);
+
+ NullWritable key = rr.createKey();
+ OrcStruct value = rr.createValue();
+ for(int i = 0; i < records.length; i++) {
+ Assert.assertEquals(true, rr.next(key, value));
+ Assert.assertEquals(records[i], value.toString());
+ }
+ Assert.assertEquals(false, rr.next(key, value));
+ }
+
+ private void checkNothingWritten() throws Exception {
+ ValidTxnList txns = msClient.getValidTxns();
+ AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns);
+ Assert.assertEquals(0, dir.getObsolete().size());
+ Assert.assertEquals(0, dir.getOriginalFiles().size());
+ List current = dir.getCurrentDirectories();
+ Assert.assertEquals(0, current.size());
+ }
+
+ @Test
+ public void testEndpointConnection() throws Exception {
+ // 1) Basic
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+ , partitionVals);
+ StreamingConnection connection = endPt.newConnection(false); //shouldn't throw
+ connection.close();
+
+ // 2) Leave partition unspecified
+ endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+ endPt.newConnection(false).close(); // should not throw
+ }
+
+ @Test
+ public void testAddPartition() throws Exception {
+ List newPartVals = new ArrayList(2);
+ newPartVals.add(PART1_CONTINENT);
+ newPartVals.add("Nepal");
+
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+ , newPartVals);
+
+ // Ensure partition is absent
+ try {
+ msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals);
+ Assert.assertTrue("Partition already exists", false);
+ } catch (NoSuchObjectException e) {
+ // expect this exception
+ }
+
+ // Create partition
+ Assert.assertNotNull(endPt.newConnection(true));
+
+ // Ensure partition is present
+ Partition p = msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals);
+ Assert.assertNotNull("Did not find added partition", p);
+ }
+
+ @Test
+ public void testTransactionBatchEmptyCommit() throws Exception {
+ // 1) to partitioned table
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.commit();
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+
+ // 2) To unpartitioned table
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ writer = new DelimitedInputWriter(fieldNames2,",", endPt);
+ connection = endPt.newConnection(false);
+
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.commit();
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionBatchEmptyAbort() throws Exception {
+ // 1) to partitioned table
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(true);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.abort();
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+
+ // 2) to unpartitioned table
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ connection = endPt.newConnection(true);
+
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.abort();
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionBatchCommit() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(true);
+
+ // 1st Txn
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+
+ // 2nd Txn
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+
+ // data should not be visible
+ checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+
+ txnBatch.commit();
+
+ checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
+ "{2, Welcome to streaming}");
+
+ txnBatch.close();
+ Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+ , txnBatch.getCurrentTransactionState());
+
+
+ connection.close();
+
+
+ // To Unpartitioned table
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ connection = endPt.newConnection(true);
+
+ // 1st Txn
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.commit();
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ connection.close();
+ }
+
+ @Test
+ public void testRemainingTransactions() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(true);
+
+ // 1) test with txn.Commit()
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ int batch=0;
+ int initialCount = txnBatch.remainingTransactions();
+ while(txnBatch.remainingTransactions()>0) {
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(--initialCount, txnBatch.remainingTransactions());
+ for (int rec=0; rec<2; ++rec) {
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write((batch * rec + ",Hello streaming").getBytes());
+ }
+ txnBatch.commit();
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ ++batch;
+ }
+ Assert.assertEquals(0,txnBatch.remainingTransactions());
+ txnBatch.close();
+
+ Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+ , txnBatch.getCurrentTransactionState());
+
+ // 2) test with txn.Abort()
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ batch=0;
+ initialCount = txnBatch.remainingTransactions();
+ while(txnBatch.remainingTransactions()>0) {
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(--initialCount,txnBatch.remainingTransactions());
+ for (int rec=0; rec<2; ++rec) {
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write((batch * rec + ",Hello streaming").getBytes());
+ }
+ txnBatch.abort();
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+ ++batch;
+ }
+ Assert.assertEquals(0,txnBatch.remainingTransactions());
+ txnBatch.close();
+
+ Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+ , txnBatch.getCurrentTransactionState());
+
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionBatchAbort() throws Exception {
+
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ txnBatch.abort();
+
+ checkNothingWritten();
+
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+
+ txnBatch.close();
+ connection.close();
+
+ checkNothingWritten();
+
+ }
+
+
+ @Test
+ public void testTransactionBatchAbortAndCommit() throws Exception {
+
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ txnBatch.abort();
+
+ checkNothingWritten();
+
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+
+ txnBatch.beginNextTransaction();
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
+ "{2, Welcome to streaming}");
+
+ txnBatch.close();
+ connection.close();
+ }
+
+ @Test
+ public void testMultipleTransactionBatchCommits() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(true);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+
+ txnBatch.beginNextTransaction();
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
+ "{2, Welcome to streaming}");
+
+ txnBatch.close();
+
+ // 2nd Txn Batch
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("3,Hello streaming - once again".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+ "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
+
+ txnBatch.beginNextTransaction();
+ txnBatch.write("4,Welcome to streaming - once again".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+ "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
+ "{4, Welcome to streaming - once again}");
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+
+ txnBatch.close();
+
+ connection.close();
+ }
+
+
+ @Test
+ public void testInterleavedTransactionBatchCommits() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+
+ // Acquire 1st Txn Batch
+ TransactionBatch txnBatch1 = connection.fetchTransactionBatch(10, writer);
+ txnBatch1.beginNextTransaction();
+
+ // Acquire 2nd Txn Batch
+ DelimitedInputWriter writer2 = new DelimitedInputWriter(fieldNames, ",", endPt);
+ TransactionBatch txnBatch2 = connection.fetchTransactionBatch(10, writer2);
+ txnBatch2.beginNextTransaction();
+
+ // Interleaved writes to both batches
+ txnBatch1.write("1,Hello streaming".getBytes());
+ txnBatch2.write("3,Hello streaming - once again".getBytes());
+
+ checkNothingWritten();
+
+ txnBatch2.commit();
+
+ checkDataWritten(11, 20, 1, 1, "{3, Hello streaming - once again}");
+
+ txnBatch1.commit();
+
+ checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+
+ txnBatch1.beginNextTransaction();
+ txnBatch1.write("2,Welcome to streaming".getBytes());
+
+ txnBatch2.beginNextTransaction();
+ txnBatch2.write("4,Welcome to streaming - once again".getBytes());
+
+ checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+
+ txnBatch1.commit();
+
+ checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+ "{2, Welcome to streaming}",
+ "{3, Hello streaming - once again}");
+
+ txnBatch2.commit();
+
+ checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+ "{2, Welcome to streaming}",
+ "{3, Hello streaming - once again}",
+ "{4, Welcome to streaming - once again}");
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch1.getCurrentTransactionState());
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch2.getCurrentTransactionState());
+
+ txnBatch1.close();
+ txnBatch2.close();
+
+ connection.close();
+ }
+
+ class WriterThd extends Thread {
+
+ private StreamingConnection conn;
+ private HiveEndPoint ep;
+ private DelimitedInputWriter writer;
+ private String data;
+
+ WriterThd(HiveEndPoint ep, String data) throws Exception {
+ String uri = "thrift://172.16.0.21:9083";
+ this.ep = ep;
+ writer = new DelimitedInputWriter(fieldNames, ",", ep);
+ conn = ep.newConnection(false);
+ this.data = data;
+ }
+
+ WriterThd(StreamingConnection conn, HiveEndPoint ep, DelimitedInputWriter writer, String data) {
+ this.conn = conn;
+ this.ep = ep;
+ this.writer = writer;
+ this.data = data;
+ }
+ @Override
+ public void run() {
+ TransactionBatch txnBatch = null;
+ try {
+ txnBatch = conn.fetchTransactionBatch(1000, writer);
+ while(txnBatch.remainingTransactions() > 0) {
+ txnBatch.beginNextTransaction();
+ txnBatch.write(data.getBytes());
+ txnBatch.write(data.getBytes());
+ txnBatch.commit();
+ } // while
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (txnBatch != null) {
+ try {
+ txnBatch.close();
+ } catch (Exception e) {
+ conn.close();
+ throw new RuntimeException(e);
+ }
+ }
+ try {
+ conn.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+ }
+ }
+
+ @Test
+ public void testConcurrentTransactionBatchCommits() throws Exception {
+ final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);
+ WriterThd t1 = new WriterThd(ep, "1,Matrix");
+ WriterThd t2 = new WriterThd(ep, "2,Gandhi");
+ WriterThd t3 = new WriterThd(ep, "3,Silence");
+
+ t1.start();
+ t2.start();
+ t3.start();
+
+ t1.join();
+ t2.join();
+ t3.join();
+
+ }
+
+ // delete db and all tables in it
+ public static void dropDB(IMetaStoreClient client, String databaseName) {
+ try {
+ for(String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) {
+ client.dropTable(databaseName, table, true, true);
+ }
+ client.dropDatabase(databaseName);
+ } catch (TException e) {
+ }
+
+ }
+
+ public static void createDbAndTable(IMetaStoreClient client, String databaseName,
+ String tableName, List partVals)
+ throws Exception {
+ Database db = new Database();
+ db.setName(databaseName);
+ client.createDatabase(db);
+
+ Table tbl = new Table();
+ tbl.setDbName(databaseName);
+ tbl.setTableName(tableName);
+ tbl.setTableType(TableType.MANAGED_TABLE.toString());
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setCols(getTableColumns());
+ sd.setNumBuckets(1);
+ tbl.setPartitionKeys(getPartitionKeys());
+
+ tbl.setSd(sd);
+
+ sd.setBucketCols(new ArrayList(2));
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap());
+ sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+
+ sd.getSerdeInfo().setSerializationLib(OrcSerde.class.getName());
+ sd.setInputFormat(HiveInputFormat.class.getName());
+ sd.setOutputFormat(OrcOutputFormat.class.getName());
+
+ Map tableParams = new HashMap();
+ tbl.setParameters(tableParams);
+ client.createTable(tbl);
+
+ try {
+ addPartition(client, tbl, partVals);
+ } catch (AlreadyExistsException e) {
+ }
+ Partition createdPartition = client.getPartition(databaseName, tableName, partVals);
+ partLocation = createdPartition.getSd().getLocation();
+ }
+
+ /*
+ private void descPart() throws CommandNeedRetryException, IOException {
+ driver.run("describe formatted " + dbName + "." + tblName);
+ ArrayList res = new ArrayList();
+ driver.getResults(res);
+ printResults(res);
+ }
+ */
+
+
+ private static void addPartition(IMetaStoreClient client, Table tbl
+ , List partValues)
+ throws IOException, TException {
+ Partition part = new Partition();
+ part.setDbName(dbName);
+ part.setTableName(tblName);
+ part.setSd(tbl.getSd());
+ part.setValues(partValues);
+ client.add_partition(part);
+ }
+
+ private static List getTableColumns() {
+ List fields = new ArrayList();
+ fields.add(new FieldSchema(COL1, serdeConstants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema(COL2, serdeConstants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+}
diff --git streaming/src/test/sit streaming/src/test/sit
new file mode 100644
index 0000000..8116724
--- /dev/null
+++ streaming/src/test/sit
@@ -0,0 +1,34 @@
+#!/bin/sh
+
+if [ "${HADOOP_HOME}x" == "x" ]
+ then
+ echo "Please set HADOOP_HOME";
+ exit 1
+fi
+
+if [ "${HIVE_HOME}x" == "x" ]
+ then
+ echo "Please set HIVE_HOME";
+ exit 1
+fi
+
+if [ "${JAVA_HOME}x" == "x" ]
+ then
+ echo "Please set JAVA_HOME";
+ exit 1
+fi
+
+for jar in ${HADOOP_HOME}/client/*.jar
+ do
+ CLASSPATH=${CLASSPATH}:$jar
+done
+
+for jar in ${HIVE_HOME}/lib/*.jar
+ do
+ CLASSPATH=${CLASSPATH}:$jar
+done
+
+CLASSPATH=${CLASSPATH}:${HADOOP_HOME}/conf
+# don't put hive-site in the classpath
+
+$JAVA_HOME/bin/java -cp ${CLASSPATH} org.apache.hive.streaming.StreamingIntegrationTester $@